Skip to content

Data Processing Pipeline

Querri’s data processing engine is built around the QDF (Querri Data Frame) format, optimized for large-scale data transformations.

QDF is Querri’s internal data structure for representing tabular data.

{
"columns": [
{
"name": "customer_id",
"type": "integer",
"nullable": false
},
{
"name": "revenue",
"type": "float",
"nullable": true
}
],
"data": [
[1, 1500.00],
[2, 2300.50],
[3, 890.25]
],
"row_count": 3,
"metadata": {
"source": "mysql_connector",
"timestamp": "2024-10-21T10:00:00Z"
}
}
  • Type safety: Enforced column types
  • Streaming support: Process data in chunks
  • Memory efficient: Optimized storage format
  • Serializable: Easy to store in MongoDB
User → Frontend → API → Step Executor

The user triggers step execution from the frontend, which calls the backend API.

The step executor validates:

  • Step configuration is complete
  • Input data is available
  • User has permissions
  • Resources are available

For steps with inputs:

  • Load QDF from previous steps
  • Deserialize from MongoDB
  • Apply any required transformations

Execute the step’s core logic:

async def execute(self, input_qdf: QDF) -> QDF:
# Apply transformations
result = transform(input_qdf, self.config)
return result
async def execute(self, input_qdf: QDF) -> QDF:
# Perform analysis
result = analyze(input_qdf, self.config)
return result
  • Serialize QDF to MongoDB
  • Update step status
  • Emit completion event
  • Notify frontend via WebSocket
def filter_rows(qdf: QDF, condition: dict) -> QDF:
"""
Filter rows based on condition.
condition = {
"column": "revenue",
"operator": "greater_than",
"value": 1000
}
"""
def select_columns(qdf: QDF, columns: list[str]) -> QDF:
"""Keep only specified columns."""
def add_calculated_column(
qdf: QDF,
name: str,
expression: str
) -> QDF:
"""
Add new column with calculation.
expression = "quantity * price"
"""
def group_by(
qdf: QDF,
group_columns: list[str],
aggregations: list[dict]
) -> QDF:
"""
Group and aggregate data.
aggregations = [
{"column": "revenue", "function": "sum"},
{"column": "orders", "function": "count"}
]
"""
def join(
left_qdf: QDF,
right_qdf: QDF,
join_type: str,
on: dict
) -> QDF:
"""
Join two QDFs.
join_type: "inner" | "left" | "right" | "outer"
on = {
"left": "customer_id",
"right": "id"
}
"""

For large datasets:

async def process_in_chunks(qdf: QDF, chunk_size: int = 10000):
"""Process data in chunks to manage memory."""
for chunk in qdf.iter_chunks(chunk_size):
result_chunk = process_chunk(chunk)
yield result_chunk

Steps don’t execute until explicitly requested:

# Build pipeline
step1 = LoadData(source="database")
step2 = FilterData(step1, condition=...)
step3 = Aggregate(step2, groupby=...)
# Execute only when needed
result = await step3.execute()

Frequently accessed data is cached in Redis:

cache_key = f"qdf:{step_uuid}:{version}"
cached = await redis.get(cache_key)
if cached:
return deserialize(cached)

Independent steps can run in parallel:

async def execute_parallel(steps: list[Step]):
tasks = [step.execute() for step in steps]
results = await asyncio.gather(*tasks)
return results
  • integer: 64-bit integers
  • float: Double-precision floating point
  • string: UTF-8 encoded text
  • boolean: True/False
  • date: Date only (YYYY-MM-DD)
  • datetime: Date and time with timezone
  • json: Structured JSON data

Automatic conversions:

  • String → Number (when parseable)
  • Number → String (always possible)
  • String → Date (with format specification)

QDF automatically infers types from data:

def infer_column_types(data: list[list]) -> list[dict]:
"""Infer types from sample data."""
  • Single QDF: 1 GB default
  • Step execution: 2 GB default
  • Total per user: 5 GB concurrent

When memory limits are exceeded:

  1. Spill to disk: Write chunks to temporary files
  2. Stream processing: Process in smaller chunks
  3. Sampling: Work with data sample for preview
  4. Abort: Cancel operation if limits exceeded

Automatic cleanup of temporary data:

  • After successful execution
  • On error or cancellation
  • On expired TTL (24 hours)
  • Type mismatch: Convert or reject
  • Missing values: Fill, drop, or error
  • Invalid values: Skip or error
  • Constraint violations: Rollback or skip
try:
result = await step.execute(input_qdf)
except DataError as e:
# Handle data quality issues
logger.error(f"Data error: {e}")
raise
except ProcessingError as e:
# Handle processing failures
logger.error(f"Processing error: {e}")
raise
  • Execution time per step
  • Memory usage per step
  • Row count processed
  • Error rate
  • Cache hit rate

Structured logs for each step:

{
"step_id": "step_123",
"step_type": "filter",
"execution_time_ms": 234,
"input_rows": 10000,
"output_rows": 3500,
"memory_mb": 45
}