Phase 04 - Lesson 16

Construa um pipeline de visão completo — Projeto final

Um sistema de visão em produção é uma cadeia de modelos e regras costurada por contratos de dados. As peças já existem nesta fase; o projeto final as conecta de ponta a ponta.

Tipo: Build Linguagens: Python Pré-requisitos: Fase 4, Lições 01-15 Tempo: ~120 minutos

Objetivos de aprendizagem

  • Projetar um pipeline de visão em produção que detecta objetos, os classifica e emite JSON estruturado — com todos os caminhos de falha tratados
  • Conectar um detector (Mask R-CNN ou YOLO), um classificador (ConvNeXt-Tiny) e um contrato de dados (Pydantic) em um único serviço
  • Fazer benchmark do pipeline de ponta a ponta e identificar o primeiro gargalo (geralmente o pré-processamento, depois o detector)
  • Entregar um serviço FastAPI mínimo que aceita o upload de uma imagem, executa o pipeline e retorna detecções com classificações

O problema

Modelos de visão individuais são úteis; produtos de visão são cadeias deles. Uma auditoria de prateleira de varejo é um detector mais um classificador de produtos mais um pipeline de OCR de preços. A direção autônoma é um detector 2D mais um detector 3D mais um segmentador mais um rastreador mais um planejador. Uma pré-triagem médica é um segmentador mais um classificador de regiões mais uma interface para o clínico.

Conectar essas cadeias é a parte que separa um protótipo de ML de um produto. Toda interface entre modelos é um novo lugar para bugs. Toda transformação de coordenadas, toda normalização, todo redimensionamento de máscara é uma candidata a falha silenciosa. Um pipeline é tão forte quanto sua interface mais fraca.

Este projeto final monta o pipeline mínimo viável: detecção + classificação + saída estruturada + uma camada de serviço. Todo o resto da Fase 4 encaixa nesse esqueleto: troque o Mask R-CNN por YOLOv8, adicione uma cabeça de OCR, adicione um ramo de segmentação, adicione um rastreador. A arquitetura é estável; as peças são plugáveis.

O conceito

O pipeline

flowchart LR
    REQ["Requisição HTTP<br/>+ bytes da imagem"] --> LOAD["Decodificar<br/>+ pré-processar"]
    LOAD --> DET["Detector<br/>(YOLO / Mask R-CNN)"]
    DET --> CROP["Recortar + redimensionar<br/>cada detecção"]
    CROP --> CLS["Classificador<br/>(ConvNeXt-Tiny)"]
    CLS --> AGG["Agregar<br/>detecções + classes"]
    AGG --> SCHEMA["Validação<br/>Pydantic"]
    SCHEMA --> RESP["Resposta JSON"]

    REQ -.->|erro| RESP

    style DET fill:#fef3c7,stroke:#d97706
    style CLS fill:#dbeafe,stroke:#2563eb
    style SCHEMA fill:#dcfce7,stroke:#16a34a

Sete estágios. Os dois estágios de modelo são caros; os outros cinco estágios são onde os bugs vivem.

Contratos de dados com Pydantic

Toda fronteira de modelo se torna um objeto tipado. Isso transforma falhas silenciosas em falhas barulhentas.

Detection(
    box: tuple[float, float, float, float],   # (x1, y1, x2, y2), absolute pixels
    score: float,                              # [0, 1]
    class_id: int,                             # from detector's label map
    mask: Optional[list[list[int]]],           # RLE-encoded if present
)

PipelineResult(
    image_id: str,
    detections: list[Detection],
    classifications: list[Classification],
    inference_ms: float,
)

Quando um detector retorna caixas em (cx, cy, w, h) em vez de (x1, y1, x2, y2), a validação do Pydantic falha na fronteira e você descobre imediatamente, em vez de depurar um recorte downstream que silenciosamente retorna regiões vazias.

Para onde vai a latência

Três verdades valem em quase todo pipeline de visão:

  1. O pré-processamento costuma ser o maior bloco isolado. Decodificar JPEGs, converter espaços de cor, redimensionar — essas operações são limitadas por CPU e fáceis de esquecer.
  2. O detector domina o tempo de GPU. 70-90% do tempo de GPU está no forward pass de detecção.
  3. O pós-processamento (NMS, codificação/decodificação RLE) é barato na GPU, caro na CPU. Sempre faça o profiling com o alvo real.

Conhecer a distribuição é o que transforma a otimização em uma lista priorizada.

Modos de falha

  • Detecções vazias — retorne uma lista vazia, não quebre. Registre no log.
  • Caixas fora dos limites — limite ao tamanho da imagem antes de recortar.
  • Recortes minúsculos — pule a classificação para caixas menores que a entrada mínima do classificador.
  • Upload corrompido — resposta 400 com um código de erro específico, não 500.
  • Falha ao carregar o modelo — falhe na inicialização do serviço, não na primeira requisição.

Um pipeline de produção trata cada um desses casos sem escrever um try/except genérico que esconde a falha. Toda falha recebe um código nomeado e uma resposta.

Batching

Um serviço de produção atende múltiplos clientes. Agrupar detecções e classificações entre requisições multiplica o throughput. O trade-off: latência extra por esperar um lote encher. Configuração típica: coletar requisições por até 20ms, agrupá-las, processar, distribuir as respostas. torchserve e triton fazem isso nativamente; serviços pequenos com carga previsível implementam o próprio micro-batcher.

Construa

Passo 1: Contratos de dados

from pydantic import BaseModel, Field
from typing import List, Optional, Tuple

class Detection(BaseModel):
    box: Tuple[float, float, float, float]
    score: float = Field(ge=0, le=1)
    class_id: int = Field(ge=0)
    mask_rle: Optional[str] = None


class Classification(BaseModel):
    detection_index: int
    class_id: int
    class_name: str
    score: float = Field(ge=0, le=1)


class PipelineResult(BaseModel):
    image_id: str
    detections: List[Detection]
    classifications: List[Classification]
    inference_ms: float

Cinco segundos de código economizam uma hora de depuração em qualquer pipeline sério.

Passo 2: Uma classe Pipeline mínima

import time
import numpy as np
import torch
from PIL import Image

class VisionPipeline:
    def __init__(self, detector, classifier, class_names,
                 device="cpu", min_crop=32):
        self.detector = detector.to(device).eval()
        self.classifier = classifier.to(device).eval()
        self.class_names = class_names
        self.device = device
        self.min_crop = min_crop

    def preprocess(self, image):
        """
        image: PIL.Image or np.ndarray (H, W, 3) uint8
        returns: CHW float tensor on device
        """
        if isinstance(image, Image.Image):
            image = np.asarray(image.convert("RGB"))
        tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        return tensor.to(self.device)

    @torch.no_grad()
    def detect(self, image_tensor):
        return self.detector([image_tensor])[0]

    @torch.no_grad()
    def classify(self, crops):
        if len(crops) == 0:
            return []
        batch = torch.stack(crops).to(self.device)
        logits = self.classifier(batch)
        probs = logits.softmax(-1)
        scores, cls = probs.max(-1)
        return list(zip(cls.tolist(), scores.tolist()))

    def run(self, image, image_id="anonymous"):
        t0 = time.perf_counter()
        tensor = self.preprocess(image)
        det = self.detect(tensor)

        crops = []
        detections = []
        valid_indices = []
        for i, (box, score, cls) in enumerate(zip(det["boxes"], det["scores"], det["labels"])):
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            detections.append(Detection(
                box=(x1, y1, x2, y2),
                score=float(score),
                class_id=int(cls),
            ))
            if (x2 - x1) < self.min_crop or (y2 - y1) < self.min_crop:
                continue
            crop = tensor[:, y1:y2, x1:x2]
            crop = torch.nn.functional.interpolate(
                crop.unsqueeze(0),
                size=(224, 224),
                mode="bilinear",
                align_corners=False,
            )[0]
            crops.append(crop)
            valid_indices.append(i)

        class_preds = self.classify(crops)

        classifications = []
        for valid_idx, (cls_id, cls_score) in zip(valid_indices, class_preds):
            classifications.append(Classification(
                detection_index=valid_idx,
                class_id=int(cls_id),
                class_name=self.class_names[cls_id],
                score=float(cls_score),
            ))

        return PipelineResult(
            image_id=image_id,
            detections=detections,
            classifications=classifications,
            inference_ms=(time.perf_counter() - t0) * 1000,
        )

Toda interface é tipada. Todo caminho de falha tem uma decisão de tratamento específica.

Passo 3: Conecte um detector e um classificador

from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models import convnext_tiny

# Use ImageNet-pretrained weights for a realistic pipeline without training
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
classifier = convnext_tiny(weights="DEFAULT")
class_names = [f"imagenet_class_{i}" for i in range(1000)]

pipe = VisionPipeline(detector, classifier, class_names)

# Smoke test with a synthetic image
test_image = (np.random.rand(400, 600, 3) * 255).astype(np.uint8)
result = pipe.run(test_image, image_id="demo")
print(result.model_dump_json(indent=2)[:500])

Passo 4: Serviço FastAPI

from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO

app = FastAPI()
pipe = None  # initialised on startup

@app.on_event("startup")
def load():
    global pipe
    detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT").eval()
    classifier = convnext_tiny(weights="DEFAULT").eval()
    pipe = VisionPipeline(detector, classifier, class_names=[f"c{i}" for i in range(1000)])

@app.post("/detect")
async def detect_endpoint(file: UploadFile):
    if file.content_type not in {"image/jpeg", "image/png", "image/webp"}:
        raise HTTPException(status_code=400, detail="unsupported image type")
    data = await file.read()
    try:
        img = Image.open(BytesIO(data)).convert("RGB")
    except Exception:
        raise HTTPException(status_code=400, detail="cannot decode image")
    result = pipe.run(img, image_id=file.filename or "upload")
    return result.model_dump()

Execute com uvicorn main:app --host 0.0.0.0 --port 8000. Teste com curl -F 'file=@dog.jpg' http://localhost:8000/detect.

Passo 5: Faça benchmark do pipeline

import time

def benchmark(pipe, num_runs=20, image_size=(400, 600)):
    img = (np.random.rand(*image_size, 3) * 255).astype(np.uint8)
    pipe.run(img)  # warm up

    stages = {"preprocess": [], "detect": [], "classify": [], "total": []}
    for _ in range(num_runs):
        t0 = time.perf_counter()
        tensor = pipe.preprocess(img)
        t1 = time.perf_counter()
        det = pipe.detect(tensor)
        t2 = time.perf_counter()
        crops = []
        for box in det["boxes"]:
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            if (x2 - x1) >= pipe.min_crop and (y2 - y1) >= pipe.min_crop:
                crop = tensor[:, y1:y2, x1:x2]
                crop = torch.nn.functional.interpolate(
                    crop.unsqueeze(0), size=(224, 224), mode="bilinear", align_corners=False
                )[0]
                crops.append(crop)
        pipe.classify(crops)
        t3 = time.perf_counter()
        stages["preprocess"].append((t1 - t0) * 1000)
        stages["detect"].append((t2 - t1) * 1000)
        stages["classify"].append((t3 - t2) * 1000)
        stages["total"].append((t3 - t0) * 1000)

    for stage, times in stages.items():
        times.sort()
        print(f"{stage:12s}  p50={times[len(times)//2]:7.1f} ms  p95={times[int(len(times)*0.95)]:7.1f} ms")

Saída típica em CPU: preprocess ~3 ms, detect 300-500 ms, classify 20-40 ms, total 350-550 ms. Em GPU, detect fica em 20-40 ms e o preprocess + classify passam a importar mais em termos relativos.

Use

Templates de produção convergem para a mesma estrutura, mais:

  • Versionamento de modelos — sempre registre no log o nome do modelo e o hash dos pesos na resposta.
  • IDs de trace por requisição — registre no log o tempo de cada estágio para cada requisição, para que você possa correlacionar respostas lentas com estágios.
  • Caminho de fallback — se o classificador estourar o tempo limite, retorne as detecções sem classificações em vez de falhar a requisição inteira.
  • Filtros de segurança — filtros de NSFW / PII rodam após a classificação, antes de a resposta deixar o serviço.
  • Endpoint em lote — um /detect_batch que aceita uma lista de URLs de imagens para processamento em massa.

Para servir em produção, torchserve, Triton Inference Server e BentoML lidam com batching, versionamento, métricas e health checks de fábrica. Rodar FastAPI diretamente é adequado para protótipos e produtos de pequena escala.

Entregue

Esta lição produz:

  • outputs/prompt-vision-service-shape-reviewer.md — um prompt que revisa o código de um serviço de visão em busca de violações de contrato/formato de resposta e nomeia o primeiro bug que quebra o sistema.
  • outputs/skill-pipeline-budget-planner.md — uma skill que, dadas uma latência e um throughput alvo, atribui um orçamento de tempo a cada estágio do pipeline e sinaliza qual estágio vai estourar seu orçamento primeiro.

Exercícios

  1. (Fácil) Execute o pipeline em 10 imagens de qualquer dataset aberto. Reporte o tempo médio por estágio e a distribuição da contagem de detecções por imagem.
  2. (Médio) Adicione um campo de saída de máscara ao Detection e codifique-o como RLE. Verifique se o JSON permanece abaixo de 1MB mesmo para uma imagem com 10 objetos.
  3. (Difícil) Adicione um micro-batcher na frente do classificador: colete recortes por até 10 ms, classifique todos em uma única chamada de GPU, retorne os resultados por requisição. Meça o ganho de throughput a 5 requisições concorrentes por segundo e a latência adicionada.

Termos-chave

Termo O que as pessoas dizem O que de fato significa
Pipeline "O sistema" Uma cadeia ordenada de etapas de pré-processamento, inferência e pós-processamento com uma interface tipada entre cada par
Contrato de dados "O schema" Definições Pydantic / dataclass às quais a entrada e a saída de cada estágio se conformam; captura bugs de integração na fronteira
Pré-processamento "Antes do modelo" Decodificação, conversão de cor, redimensionamento, normalização; geralmente o maior consumidor de tempo de CPU
Pós-processamento "Depois do modelo" NMS, redimensionamento de máscara, threshold, codificação RLE; barato na GPU, caro na CPU
Microbatcher "Coletar e então encaminhar" Agregador que espera uma janela fixa por múltiplas requisições e executa um único forward pass em lote
Trace ID "Id da requisição" Identificador por requisição registrado em cada estágio, para que requisições lentas possam ser rastreadas de ponta a ponta
Código de falha "Erro nomeado" Código de erro específico por classe de falha em vez de um 500 genérico; habilita a lógica de retry do cliente
Health check "Sonda de prontidão" Endpoint barato que reporta se o serviço consegue responder; os balanceadores de carga dependem disso

Leitura adicional

0 lifetime access. Curriculum based on AI Engineering from Scratch by Rohit Ghumare (MIT, used under attribution).