Voltar ao blog
30 de maio de 20264 min de leitura

WebSocket para Monitorar IA em Tempo Real

pythonwebsocketrealtime

Quando uma IA está "pensando", o que exatamente está acontecendo? Na maioria dos chatbots, você manda uma mensagem e espera. Talvez veja um "digitando..." genérico. Mas e se você pudesse ver em tempo real cada etapa: a classificação de intenção, a extração de triplas, a busca web, a criação de mundos, as inferências simbólicas? Isso transforma debugging de "por que respondeu errado?" em "ah, a extração falhou nesse passo específico".

Implementei comunicação via WebSocket onde o backend emite eventos para cada decisão interna do motor. O frontend tem um sidebar que lista eventos conforme acontecem e um grafo interativo que mostra os mundos evoluindo ao vivo. Não é só chat: é observabilidade completa do raciocínio da IA em tempo real.

Guia de tópicos:

  • Por Que WebSocket e Não REST para IA
  • Arquitetura de Eventos Internos
  • Tipos de Eventos que Vale Emitir
  • Streaming de Resposta em Chunks
  • Visualização de Grafo ao Vivo
  • Exemplo Prático em Python
  • Considerações Finais

Por Que WebSocket e Não REST para IA

REST funciona para request-response simples. Mas um motor de raciocínio faz muito mais que responder: extrai triplas, cria mundos, faz inferências, busca na web, tudo em background após responder. Se fosse REST, o frontend não saberia que isso está acontecendo. Teria que fazer polling ("já terminou? e agora? e agora?").

WebSocket permite comunicação bidirecional contínua: o frontend manda mensagens, o backend responde E emite eventos não solicitados. "Acabei de criar um mundo novo", "Encontrei uma contradição", "A curiosidade descobriu algo". O frontend reage em tempo real sem polling.

Arquitetura de Eventos Internos

O motor tem um callback que é chamado em pontos estratégicos do pipeline. Cada evento tem um tipo (string) e dados (dict). O servidor WebSocket recebe esses eventos e faz broadcast para clientes conectados.

Eventos são fire-and-forget: se nenhum cliente está conectado, descarta. Se o envio falha, captura silenciosamente. O motor nunca bloqueia esperando confirmação de entrega. Performance do raciocínio não é afetada pela observabilidade.

Tipos de Eventos que Vale Emitir

Não emita tudo. Emita o que ajuda a entender o fluxo:

Início/fim de processamento (chat_start, chat_complete). Classificação de intenção (intent_classified: CHAT ou WEB). Busca web (web_search_start, web_search_complete com contagem de resultados). Extração de triplas (extraction_complete com contagem). Matching de mundo (world_matched com similaridade, ou world_created). Inferências (reasoning_alert quando encontra contradição). Curiosidade (curiosity_triggered quando entropia alta é detectada).

Cada evento carrega dados mínimos mas suficientes: world_id afetado, contagens, similaridades. O frontend decide o que mostrar e o que filtrar.

Streaming de Resposta em Chunks

A resposta do LLM é enviada em chunks conforme gerada. O frontend recebe mensagens "chat_chunk" com fragmentos e monta progressivamente. Latência percebida cai de "esperar 5s pela resposta completa" para "ver o primeiro token em 500ms". É a diferença entre parecer lento e parecer responsivo.

Visualização de Grafo ao Vivo

Um endpoint retorna o estado atual do grafo: nós (mundos com topic, maturidade, entropia) e arestas (relações causais com peso). Quando eventos de criação/fusão chegam, o frontend atualiza o grafo automaticamente. Você vê novos nós aparecendo e arestas se formando conforme conversa.

Exemplo Prático em Python

import asyncio
import json
from typing import Callable, Optional

class EventEmitter:
    """Emit events to connected WebSocket clients."""

    def __init__(self):
        self._listeners: list[Callable] = []

    def add_listener(self, callback: Callable):
        self._listeners.append(callback)

    async def emit(self, event_type: str, data: dict):
        message = json.dumps({"type": event_type, **data})
        for listener in self._listeners:
            try:
                await listener(message)
            except Exception:
                pass  # Never block the engine


class ReasoningEngine:
    """Engine that emits events at each processing step."""

    def __init__(self):
        self.events = EventEmitter()

    async def process(self, user_input: str) -> str:
        await self.events.emit("chat_start", {"query": user_input})

        # Step 1: Classify intent
        intent = self._classify(user_input)
        await self.events.emit("intent_classified", {"intent": intent})

        # Step 2: Web search if needed
        snippets = []
        if intent == "WEB":
            await self.events.emit("web_search_start", {"query": user_input})
            snippets = self._search(user_input)
            await self.events.emit("web_search_complete", {"results": len(snippets)})

        # Step 3: Extract triples
        triples = self._extract(user_input, snippets)
        await self.events.emit("extraction_complete", {"count": len(triples)})

        # Step 4: Update knowledge graph
        world_id = self._update_graph(triples)
        await self.events.emit("world_updated", {
            "world_id": world_id, "new_triples": len(triples)
        })

        # Step 5: Run inferences
        alerts = self._infer(world_id)
        if alerts:
            await self.events.emit("reasoning_alert", {"alerts": alerts})

        # Step 6: Generate response
        response = self._respond(user_input)
        await self.events.emit("chat_complete", {"response_length": len(response)})

        return response

    def _classify(self, text):
        return "WEB" if "?" in text and len(text.split()) > 5 else "CHAT"

    def _search(self, query):
        return ["snippet1", "snippet2"]  # Simplified

    def _extract(self, text, snippets):
        return [{"s": "example", "v": "is", "o": "demo", "c": 0.8}]

    def _update_graph(self, triples):
        return "world_abc"

    def _infer(self, world_id):
        return []

    def _respond(self, text):
        return f"Response to: {text}"


# Demo: simulate with event logging
async def main():
    engine = ReasoningEngine()

    # Register event listener (in production: send via WebSocket)
    events_log = []
    async def log_event(message):
        data = json.loads(message)
        events_log.append(data)
        event_type = data.pop("type")
        print(f"  📡 {event_type}: {json.dumps(data)}")

    engine.events.add_listener(log_event)

    print("Processing: 'What is the capital of France?'\n")
    response = await engine.process("What is the capital of France?")
    print(f"\nResponse: {response}")
    print(f"Total events emitted: {len(events_log)}")

asyncio.run(main())
# Output:
#   📡 chat_start: {"query": "What is the capital of France?"}
#   📡 intent_classified: {"intent": "WEB"}
#   📡 web_search_start: {"query": "What is the capital of France?"}
#   📡 web_search_complete: {"results": 2}
#   📡 extraction_complete: {"count": 1}
#   📡 world_updated: {"world_id": "world_abc", "new_triples": 1}
#   📡 chat_complete: {"response_length": 45}

Considerações Finais

WebSocket com eventos transforma uma IA de "caixa preta" em "caixa de vidro". Você vê cada decisão, cada extração, cada inferência. Quando algo dá errado, sabe exatamente onde. O feedback loop de desenvolvimento fica ordens de magnitude mais rápido que ler logs depois.

O padrão é aplicável a qualquer sistema de IA complexo: pipelines de RAG, agentes autônomos, sistemas de recomendação. Emita eventos em tempo real e visualize ao vivo. Seus bugs vão agradecer.


Links indicativos: