WebSocket para Monitorar IA em Tempo Real
Quando uma IA está "pensando", o que exatamente está acontecendo? Na maioria dos chatbots, você manda uma mensagem e espera. Talvez veja um "digitando..." genérico. Mas e se você pudesse ver em tempo real cada etapa: a classificação de intenção, a extração de triplas, a busca web, a criação de mundos, as inferências simbólicas? Isso transforma debugging de "por que respondeu errado?" em "ah, a extração falhou nesse passo específico".
Implementei comunicação via WebSocket onde o backend emite eventos para cada decisão interna do motor. O frontend tem um sidebar que lista eventos conforme acontecem e um grafo interativo que mostra os mundos evoluindo ao vivo. Não é só chat: é observabilidade completa do raciocínio da IA em tempo real.
Guia de tópicos:
- Por Que WebSocket e Não REST para IA
- Arquitetura de Eventos Internos
- Tipos de Eventos que Vale Emitir
- Streaming de Resposta em Chunks
- Visualização de Grafo ao Vivo
- Exemplo Prático em Python
- Considerações Finais
Por Que WebSocket e Não REST para IA
REST funciona para request-response simples. Mas um motor de raciocínio faz muito mais que responder: extrai triplas, cria mundos, faz inferências, busca na web, tudo em background após responder. Se fosse REST, o frontend não saberia que isso está acontecendo. Teria que fazer polling ("já terminou? e agora? e agora?").
WebSocket permite comunicação bidirecional contínua: o frontend manda mensagens, o backend responde E emite eventos não solicitados. "Acabei de criar um mundo novo", "Encontrei uma contradição", "A curiosidade descobriu algo". O frontend reage em tempo real sem polling.
Arquitetura de Eventos Internos
O motor tem um callback que é chamado em pontos estratégicos do pipeline. Cada evento tem um tipo (string) e dados (dict). O servidor WebSocket recebe esses eventos e faz broadcast para clientes conectados.
Eventos são fire-and-forget: se nenhum cliente está conectado, descarta. Se o envio falha, captura silenciosamente. O motor nunca bloqueia esperando confirmação de entrega. Performance do raciocínio não é afetada pela observabilidade.
Tipos de Eventos que Vale Emitir
Não emita tudo. Emita o que ajuda a entender o fluxo:
Início/fim de processamento (chat_start, chat_complete). Classificação de intenção (intent_classified: CHAT ou WEB). Busca web (web_search_start, web_search_complete com contagem de resultados). Extração de triplas (extraction_complete com contagem). Matching de mundo (world_matched com similaridade, ou world_created). Inferências (reasoning_alert quando encontra contradição). Curiosidade (curiosity_triggered quando entropia alta é detectada).
Cada evento carrega dados mínimos mas suficientes: world_id afetado, contagens, similaridades. O frontend decide o que mostrar e o que filtrar.
Streaming de Resposta em Chunks
A resposta do LLM é enviada em chunks conforme gerada. O frontend recebe mensagens "chat_chunk" com fragmentos e monta progressivamente. Latência percebida cai de "esperar 5s pela resposta completa" para "ver o primeiro token em 500ms". É a diferença entre parecer lento e parecer responsivo.
Visualização de Grafo ao Vivo
Um endpoint retorna o estado atual do grafo: nós (mundos com topic, maturidade, entropia) e arestas (relações causais com peso). Quando eventos de criação/fusão chegam, o frontend atualiza o grafo automaticamente. Você vê novos nós aparecendo e arestas se formando conforme conversa.
Exemplo Prático em Python
import asyncio
import json
from typing import Callable, Optional
class EventEmitter:
"""Emit events to connected WebSocket clients."""
def __init__(self):
self._listeners: list[Callable] = []
def add_listener(self, callback: Callable):
self._listeners.append(callback)
async def emit(self, event_type: str, data: dict):
message = json.dumps({"type": event_type, **data})
for listener in self._listeners:
try:
await listener(message)
except Exception:
pass # Never block the engine
class ReasoningEngine:
"""Engine that emits events at each processing step."""
def __init__(self):
self.events = EventEmitter()
async def process(self, user_input: str) -> str:
await self.events.emit("chat_start", {"query": user_input})
# Step 1: Classify intent
intent = self._classify(user_input)
await self.events.emit("intent_classified", {"intent": intent})
# Step 2: Web search if needed
snippets = []
if intent == "WEB":
await self.events.emit("web_search_start", {"query": user_input})
snippets = self._search(user_input)
await self.events.emit("web_search_complete", {"results": len(snippets)})
# Step 3: Extract triples
triples = self._extract(user_input, snippets)
await self.events.emit("extraction_complete", {"count": len(triples)})
# Step 4: Update knowledge graph
world_id = self._update_graph(triples)
await self.events.emit("world_updated", {
"world_id": world_id, "new_triples": len(triples)
})
# Step 5: Run inferences
alerts = self._infer(world_id)
if alerts:
await self.events.emit("reasoning_alert", {"alerts": alerts})
# Step 6: Generate response
response = self._respond(user_input)
await self.events.emit("chat_complete", {"response_length": len(response)})
return response
def _classify(self, text):
return "WEB" if "?" in text and len(text.split()) > 5 else "CHAT"
def _search(self, query):
return ["snippet1", "snippet2"] # Simplified
def _extract(self, text, snippets):
return [{"s": "example", "v": "is", "o": "demo", "c": 0.8}]
def _update_graph(self, triples):
return "world_abc"
def _infer(self, world_id):
return []
def _respond(self, text):
return f"Response to: {text}"
# Demo: simulate with event logging
async def main():
engine = ReasoningEngine()
# Register event listener (in production: send via WebSocket)
events_log = []
async def log_event(message):
data = json.loads(message)
events_log.append(data)
event_type = data.pop("type")
print(f" 📡 {event_type}: {json.dumps(data)}")
engine.events.add_listener(log_event)
print("Processing: 'What is the capital of France?'\n")
response = await engine.process("What is the capital of France?")
print(f"\nResponse: {response}")
print(f"Total events emitted: {len(events_log)}")
asyncio.run(main())
# Output:
# 📡 chat_start: {"query": "What is the capital of France?"}
# 📡 intent_classified: {"intent": "WEB"}
# 📡 web_search_start: {"query": "What is the capital of France?"}
# 📡 web_search_complete: {"results": 2}
# 📡 extraction_complete: {"count": 1}
# 📡 world_updated: {"world_id": "world_abc", "new_triples": 1}
# 📡 chat_complete: {"response_length": 45}
Considerações Finais
WebSocket com eventos transforma uma IA de "caixa preta" em "caixa de vidro". Você vê cada decisão, cada extração, cada inferência. Quando algo dá errado, sabe exatamente onde. O feedback loop de desenvolvimento fica ordens de magnitude mais rápido que ler logs depois.
O padrão é aplicável a qualquer sistema de IA complexo: pipelines de RAG, agentes autônomos, sistemas de recomendação. Emita eventos em tempo real e visualize ao vivo. Seus bugs vão agradecer.
Links indicativos:
- WebSockets: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
- Event-Driven Architecture: https://en.wikipedia.org/wiki/Event-driven_architecture
- vis-network (graph visualization): https://visjs.github.io/vis-network/