Skip to content

WebSocket Architecture

Querri uses WebSockets to provide real-time updates for project execution, chat interactions, and collaborative features.

WebSocket connections enable bidirectional communication between the frontend and backend:

  • Step execution updates: Progress and results
  • Chat responses: Streaming AI responses
  • Collaborative editing: Multi-user project updates
  • System notifications: Alerts and messages
// Frontend
const ws = new WebSocket(
`ws://localhost:8000/ws/project/${projectUuid}?token=${jwt}`
);
ws.onopen = () => {
console.log('Connected to project WebSocket');
};

The backend validates the JWT token:

async def websocket_endpoint(
websocket: WebSocket,
project_uuid: str,
token: str
):
# Validate JWT
user = await validate_token(token)
# Check permissions
if not await can_view_project(user, project_uuid):
await websocket.close(code=403)
return
# Accept connection
await websocket.accept()

The connection subscribes to relevant Redis channels:

# Subscribe to project events
pubsub = redis.pubsub()
await pubsub.subscribe(f"project:{project_uuid}")
{
"type": "step_started",
"step_id": "step_123",
"timestamp": "2024-10-21T10:00:00Z"
}
{
"type": "step_progress",
"step_id": "step_123",
"progress": 0.45,
"message": "Processing chunk 45/100"
}
{
"type": "step_completed",
"step_id": "step_123",
"result": {
"row_count": 1500,
"columns": ["id", "name", "value"]
},
"timestamp": "2024-10-21T10:01:23Z"
}
{
"type": "step_error",
"step_id": "step_123",
"error": "Database connection failed",
"timestamp": "2024-10-21T10:00:45Z"
}
{
"type": "chat_message",
"role": "user",
"content": "Show me top customers by revenue",
"timestamp": "2024-10-21T10:00:00Z"
}
{
"type": "chat_response_chunk",
"content": "Here are ",
"is_complete": false
}
{
"type": "chat_response_chunk",
"content": "the top customers...",
"is_complete": true
}
{
"type": "project_modified",
"user_id": "user_456",
"changes": {
"steps_added": ["step_789"],
"steps_removed": [],
"dataflows_added": []
},
"timestamp": "2024-10-21T10:00:00Z"
}
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, list[WebSocket]] = {}
async def connect(self, project_uuid: str, websocket: WebSocket):
"""Add connection to project room."""
if project_uuid not in self.active_connections:
self.active_connections[project_uuid] = []
self.active_connections[project_uuid].append(websocket)
async def disconnect(self, project_uuid: str, websocket: WebSocket):
"""Remove connection from project room."""
self.active_connections[project_uuid].remove(websocket)
async def broadcast(self, project_uuid: str, message: dict):
"""Send message to all connections in project."""
for connection in self.active_connections.get(project_uuid, []):
await connection.send_json(message)
async def redis_listener(project_uuid: str, manager: ConnectionManager):
"""Listen for Redis events and forward to WebSocket clients."""
pubsub = redis.pubsub()
await pubsub.subscribe(f"project:{project_uuid}")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await manager.broadcast(project_uuid, data)
async def publish_step_completed(project_uuid: str, step_id: str, result: dict):
"""Publish step completion event."""
event = {
"type": "step_completed",
"step_id": step_id,
"result": result,
"timestamp": datetime.utcnow().isoformat()
}
await redis.publish(f"project:{project_uuid}", json.dumps(event))
class ProjectWebSocket {
constructor(projectUuid, token) {
this.projectUuid = projectUuid;
this.token = token;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
connect() {
this.ws = new WebSocket(
`ws://localhost:8000/ws/project/${this.projectUuid}?token=${this.token}`
);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = () => {
console.log('WebSocket closed');
this.reconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => this.connect(), delay);
}
}
handleMessage(message) {
switch (message.type) {
case 'step_completed':
this.onStepCompleted(message);
break;
case 'step_error':
this.onStepError(message);
break;
case 'chat_response_chunk':
this.onChatChunk(message);
break;
// ... handle other message types
}
}
send(message) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
stores/websocket.js
import { writable } from 'svelte/store';
export const projectWebSocket = writable(null);
export function connectToProject(projectUuid, token) {
const ws = new ProjectWebSocket(projectUuid, token);
ws.onStepCompleted = (message) => {
// Update project store with new results
projects.updateStepResult(message.step_id, message.result);
};
ws.connect();
projectWebSocket.set(ws);
return ws;
}
  • Per user: 10 concurrent connections
  • Per project: 100 concurrent viewers
  • System-wide: 10,000 concurrent connections
  • User messages: 10/second
  • System events: Unlimited
  • Broadcasts: Throttled to 1/second per project

Backend reuses Redis connections:

# Shared Redis connection pool
redis_pool = redis.ConnectionPool(
host='redis',
port=6379,
max_connections=100
)

Keep connections alive:

async def heartbeat(websocket: WebSocket):
"""Send periodic ping to keep connection alive."""
while True:
try:
await websocket.send_json({"type": "ping"})
await asyncio.sleep(30)
except:
break

Critical messages can require acknowledgment:

{
"type": "step_completed",
"step_id": "step_123",
"requires_ack": true,
"message_id": "msg_789"
}

Exponential backoff with jitter:

function getReconnectDelay(attempt) {
const base = 1000;
const max = 30000;
const jitter = Math.random() * 1000;
return Math.min(base * Math.pow(2, attempt) + jitter, max);
}
  • JWT validation on connection
  • Token refresh during long connections
  • Connection terminated on invalid token
  • Per-project permission checks
  • Message filtering based on permissions
  • Rate limiting per user
  • No sensitive data in WebSocket messages
  • References to data instead of full datasets
  • Audit logging of all messages
  • Active connections count
  • Messages per second
  • Connection duration
  • Reconnection rate
  • Error rate
{
"event": "websocket_connected",
"project_uuid": "proj_123",
"user_id": "user_456",
"timestamp": "2024-10-21T10:00:00Z"
}

Check:

  1. JWT token is valid
  2. User has project permissions
  3. WebSocket endpoint is accessible
  4. Firewall allows WebSocket connections

Check:

  1. Connection is still open
  2. Redis pub/sub is working
  3. Event is being published correctly
  4. Message serialization is correct

Possible causes:

  1. Network congestion
  2. Redis overload
  3. Too many concurrent connections
  4. Large message payloads