WebSocket Architecture
WebSocket Architecture
Section titled “WebSocket Architecture”Querri uses WebSockets to provide real-time updates for project execution, chat interactions, and collaborative features.
Overview
Section titled “Overview”WebSocket connections enable bidirectional communication between the frontend and backend:
- Step execution updates: Progress and results
- Chat responses: Streaming AI responses
- Collaborative editing: Multi-user project updates
- System notifications: Alerts and messages
Connection Flow
Section titled “Connection Flow”1. Establishing Connection
Section titled “1. Establishing Connection”// Frontendconst ws = new WebSocket( `ws://localhost:8000/ws/project/${projectUuid}?token=${jwt}`);
ws.onopen = () => { console.log('Connected to project WebSocket');};2. Authentication
Section titled “2. Authentication”The backend validates the JWT token:
async def websocket_endpoint( websocket: WebSocket, project_uuid: str, token: str): # Validate JWT user = await validate_token(token)
# Check permissions if not await can_view_project(user, project_uuid): await websocket.close(code=403) return
# Accept connection await websocket.accept()3. Subscription
Section titled “3. Subscription”The connection subscribes to relevant Redis channels:
# Subscribe to project eventspubsub = redis.pubsub()await pubsub.subscribe(f"project:{project_uuid}")Message Types
Section titled “Message Types”Step Execution Events
Section titled “Step Execution Events”Step Started
Section titled “Step Started”{ "type": "step_started", "step_id": "step_123", "timestamp": "2024-10-21T10:00:00Z"}Step Progress
Section titled “Step Progress”{ "type": "step_progress", "step_id": "step_123", "progress": 0.45, "message": "Processing chunk 45/100"}Step Completed
Section titled “Step Completed”{ "type": "step_completed", "step_id": "step_123", "result": { "row_count": 1500, "columns": ["id", "name", "value"] }, "timestamp": "2024-10-21T10:01:23Z"}Step Error
Section titled “Step Error”{ "type": "step_error", "step_id": "step_123", "error": "Database connection failed", "timestamp": "2024-10-21T10:00:45Z"}Chat Events
Section titled “Chat Events”Chat Message
Section titled “Chat Message”{ "type": "chat_message", "role": "user", "content": "Show me top customers by revenue", "timestamp": "2024-10-21T10:00:00Z"}Chat Response (Streaming)
Section titled “Chat Response (Streaming)”{ "type": "chat_response_chunk", "content": "Here are ", "is_complete": false}{ "type": "chat_response_chunk", "content": "the top customers...", "is_complete": true}Project Updates
Section titled “Project Updates”Project Modified
Section titled “Project Modified”{ "type": "project_modified", "user_id": "user_456", "changes": { "steps_added": ["step_789"], "steps_removed": [], "dataflows_added": [] }, "timestamp": "2024-10-21T10:00:00Z"}Backend Architecture
Section titled “Backend Architecture”WebSocket Manager
Section titled “WebSocket Manager”class ConnectionManager: def __init__(self): self.active_connections: dict[str, list[WebSocket]] = {}
async def connect(self, project_uuid: str, websocket: WebSocket): """Add connection to project room.""" if project_uuid not in self.active_connections: self.active_connections[project_uuid] = [] self.active_connections[project_uuid].append(websocket)
async def disconnect(self, project_uuid: str, websocket: WebSocket): """Remove connection from project room.""" self.active_connections[project_uuid].remove(websocket)
async def broadcast(self, project_uuid: str, message: dict): """Send message to all connections in project.""" for connection in self.active_connections.get(project_uuid, []): await connection.send_json(message)Redis Pub/Sub Integration
Section titled “Redis Pub/Sub Integration”async def redis_listener(project_uuid: str, manager: ConnectionManager): """Listen for Redis events and forward to WebSocket clients.""" pubsub = redis.pubsub() await pubsub.subscribe(f"project:{project_uuid}")
async for message in pubsub.listen(): if message["type"] == "message": data = json.loads(message["data"]) await manager.broadcast(project_uuid, data)Publishing Events
Section titled “Publishing Events”async def publish_step_completed(project_uuid: str, step_id: str, result: dict): """Publish step completion event.""" event = { "type": "step_completed", "step_id": step_id, "result": result, "timestamp": datetime.utcnow().isoformat() } await redis.publish(f"project:{project_uuid}", json.dumps(event))Frontend Integration
Section titled “Frontend Integration”Connection Management
Section titled “Connection Management”class ProjectWebSocket { constructor(projectUuid, token) { this.projectUuid = projectUuid; this.token = token; this.ws = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; }
connect() { this.ws = new WebSocket( `ws://localhost:8000/ws/project/${this.projectUuid}?token=${this.token}` );
this.ws.onopen = () => { console.log('WebSocket connected'); this.reconnectAttempts = 0; };
this.ws.onmessage = (event) => { const message = JSON.parse(event.data); this.handleMessage(message); };
this.ws.onclose = () => { console.log('WebSocket closed'); this.reconnect(); };
this.ws.onerror = (error) => { console.error('WebSocket error:', error); }; }
reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); setTimeout(() => this.connect(), delay); } }
handleMessage(message) { switch (message.type) { case 'step_completed': this.onStepCompleted(message); break; case 'step_error': this.onStepError(message); break; case 'chat_response_chunk': this.onChatChunk(message); break; // ... handle other message types } }
send(message) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } }
close() { if (this.ws) { this.ws.close(); } }}Svelte Store Integration
Section titled “Svelte Store Integration”import { writable } from 'svelte/store';
export const projectWebSocket = writable(null);
export function connectToProject(projectUuid, token) { const ws = new ProjectWebSocket(projectUuid, token);
ws.onStepCompleted = (message) => { // Update project store with new results projects.updateStepResult(message.step_id, message.result); };
ws.connect(); projectWebSocket.set(ws);
return ws;}Performance Considerations
Section titled “Performance Considerations”Connection Limits
Section titled “Connection Limits”- Per user: 10 concurrent connections
- Per project: 100 concurrent viewers
- System-wide: 10,000 concurrent connections
Message Rate Limiting
Section titled “Message Rate Limiting”- User messages: 10/second
- System events: Unlimited
- Broadcasts: Throttled to 1/second per project
Connection Pooling
Section titled “Connection Pooling”Backend reuses Redis connections:
# Shared Redis connection poolredis_pool = redis.ConnectionPool( host='redis', port=6379, max_connections=100)Reliability Features
Section titled “Reliability Features”Heartbeat/Ping
Section titled “Heartbeat/Ping”Keep connections alive:
async def heartbeat(websocket: WebSocket): """Send periodic ping to keep connection alive.""" while True: try: await websocket.send_json({"type": "ping"}) await asyncio.sleep(30) except: breakMessage Acknowledgment
Section titled “Message Acknowledgment”Critical messages can require acknowledgment:
{ "type": "step_completed", "step_id": "step_123", "requires_ack": true, "message_id": "msg_789"}Reconnection Strategy
Section titled “Reconnection Strategy”Exponential backoff with jitter:
function getReconnectDelay(attempt) { const base = 1000; const max = 30000; const jitter = Math.random() * 1000; return Math.min(base * Math.pow(2, attempt) + jitter, max);}Security
Section titled “Security”Authentication
Section titled “Authentication”- JWT validation on connection
- Token refresh during long connections
- Connection terminated on invalid token
Authorization
Section titled “Authorization”- Per-project permission checks
- Message filtering based on permissions
- Rate limiting per user
Data Protection
Section titled “Data Protection”- No sensitive data in WebSocket messages
- References to data instead of full datasets
- Audit logging of all messages
Monitoring
Section titled “Monitoring”Metrics
Section titled “Metrics”- Active connections count
- Messages per second
- Connection duration
- Reconnection rate
- Error rate
Logging
Section titled “Logging”{ "event": "websocket_connected", "project_uuid": "proj_123", "user_id": "user_456", "timestamp": "2024-10-21T10:00:00Z"}Troubleshooting
Section titled “Troubleshooting”Connection Fails
Section titled “Connection Fails”Check:
- JWT token is valid
- User has project permissions
- WebSocket endpoint is accessible
- Firewall allows WebSocket connections
Messages Not Received
Section titled “Messages Not Received”Check:
- Connection is still open
- Redis pub/sub is working
- Event is being published correctly
- Message serialization is correct
High Latency
Section titled “High Latency”Possible causes:
- Network congestion
- Redis overload
- Too many concurrent connections
- Large message payloads