Data Processing Pipeline
Data Processing Pipeline
Section titled “Data Processing Pipeline”Querri’s data processing engine is built around the QDF (Querri Data Frame) format, optimized for large-scale data transformations.
QDF (Querri Data Frame)
Section titled “QDF (Querri Data Frame)”QDF is Querri’s internal data structure for representing tabular data.
Structure
Section titled “Structure”{ "columns": [ { "name": "customer_id", "type": "integer", "nullable": false }, { "name": "revenue", "type": "float", "nullable": true } ], "data": [ [1, 1500.00], [2, 2300.50], [3, 890.25] ], "row_count": 3, "metadata": { "source": "mysql_connector", "timestamp": "2024-10-21T10:00:00Z" }}Features
Section titled “Features”- Type safety: Enforced column types
- Streaming support: Process data in chunks
- Memory efficient: Optimized storage format
- Serializable: Easy to store in MongoDB
Step Execution Pipeline
Section titled “Step Execution Pipeline”1. Request Phase
Section titled “1. Request Phase”User → Frontend → API → Step ExecutorThe user triggers step execution from the frontend, which calls the backend API.
2. Validation Phase
Section titled “2. Validation Phase”The step executor validates:
- Step configuration is complete
- Input data is available
- User has permissions
- Resources are available
3. Data Loading Phase
Section titled “3. Data Loading Phase”For steps with inputs:
- Load QDF from previous steps
- Deserialize from MongoDB
- Apply any required transformations
4. Processing Phase
Section titled “4. Processing Phase”Execute the step’s core logic:
Transform Steps
Section titled “Transform Steps”async def execute(self, input_qdf: QDF) -> QDF: # Apply transformations result = transform(input_qdf, self.config) return resultAnalysis Steps
Section titled “Analysis Steps”async def execute(self, input_qdf: QDF) -> QDF: # Perform analysis result = analyze(input_qdf, self.config) return result5. Storage Phase
Section titled “5. Storage Phase”- Serialize QDF to MongoDB
- Update step status
- Emit completion event
- Notify frontend via WebSocket
Data Transformations
Section titled “Data Transformations”Filter
Section titled “Filter”def filter_rows(qdf: QDF, condition: dict) -> QDF: """ Filter rows based on condition.
condition = { "column": "revenue", "operator": "greater_than", "value": 1000 } """Select
Section titled “Select”def select_columns(qdf: QDF, columns: list[str]) -> QDF: """Keep only specified columns."""Calculate
Section titled “Calculate”def add_calculated_column( qdf: QDF, name: str, expression: str) -> QDF: """ Add new column with calculation.
expression = "quantity * price" """Group By
Section titled “Group By”def group_by( qdf: QDF, group_columns: list[str], aggregations: list[dict]) -> QDF: """ Group and aggregate data.
aggregations = [ {"column": "revenue", "function": "sum"}, {"column": "orders", "function": "count"} ] """def join( left_qdf: QDF, right_qdf: QDF, join_type: str, on: dict) -> QDF: """ Join two QDFs.
join_type: "inner" | "left" | "right" | "outer" on = { "left": "customer_id", "right": "id" } """Performance Optimizations
Section titled “Performance Optimizations”Streaming Processing
Section titled “Streaming Processing”For large datasets:
async def process_in_chunks(qdf: QDF, chunk_size: int = 10000): """Process data in chunks to manage memory.""" for chunk in qdf.iter_chunks(chunk_size): result_chunk = process_chunk(chunk) yield result_chunkLazy Evaluation
Section titled “Lazy Evaluation”Steps don’t execute until explicitly requested:
# Build pipelinestep1 = LoadData(source="database")step2 = FilterData(step1, condition=...)step3 = Aggregate(step2, groupby=...)
# Execute only when neededresult = await step3.execute()Caching
Section titled “Caching”Frequently accessed data is cached in Redis:
cache_key = f"qdf:{step_uuid}:{version}"cached = await redis.get(cache_key)if cached: return deserialize(cached)Parallel Processing
Section titled “Parallel Processing”Independent steps can run in parallel:
async def execute_parallel(steps: list[Step]): tasks = [step.execute() for step in steps] results = await asyncio.gather(*tasks) return resultsData Type System
Section titled “Data Type System”Supported Types
Section titled “Supported Types”- integer: 64-bit integers
- float: Double-precision floating point
- string: UTF-8 encoded text
- boolean: True/False
- date: Date only (YYYY-MM-DD)
- datetime: Date and time with timezone
- json: Structured JSON data
Type Conversions
Section titled “Type Conversions”Automatic conversions:
- String → Number (when parseable)
- Number → String (always possible)
- String → Date (with format specification)
Type Inference
Section titled “Type Inference”QDF automatically infers types from data:
def infer_column_types(data: list[list]) -> list[dict]: """Infer types from sample data."""Memory Management
Section titled “Memory Management”Memory Limits
Section titled “Memory Limits”- Single QDF: 1 GB default
- Step execution: 2 GB default
- Total per user: 5 GB concurrent
Out-of-Memory Handling
Section titled “Out-of-Memory Handling”When memory limits are exceeded:
- Spill to disk: Write chunks to temporary files
- Stream processing: Process in smaller chunks
- Sampling: Work with data sample for preview
- Abort: Cancel operation if limits exceeded
Cleanup
Section titled “Cleanup”Automatic cleanup of temporary data:
- After successful execution
- On error or cancellation
- On expired TTL (24 hours)
Error Handling
Section titled “Error Handling”Data Errors
Section titled “Data Errors”- Type mismatch: Convert or reject
- Missing values: Fill, drop, or error
- Invalid values: Skip or error
- Constraint violations: Rollback or skip
Processing Errors
Section titled “Processing Errors”try: result = await step.execute(input_qdf)except DataError as e: # Handle data quality issues logger.error(f"Data error: {e}") raiseexcept ProcessingError as e: # Handle processing failures logger.error(f"Processing error: {e}") raiseMonitoring
Section titled “Monitoring”Metrics Tracked
Section titled “Metrics Tracked”- Execution time per step
- Memory usage per step
- Row count processed
- Error rate
- Cache hit rate
Logging
Section titled “Logging”Structured logs for each step:
{ "step_id": "step_123", "step_type": "filter", "execution_time_ms": 234, "input_rows": 10000, "output_rows": 3500, "memory_mb": 45}