Skip to content

Python SDK

The Querri Python SDK (querri) provides both synchronous and asynchronous clients for embedding Querri analytics, managing users and access policies, querying data, and controlling embed sessions from Python applications.

Requires Python 3.9+. Built on httpx and Pydantic v2. Fully typed.

Terminal window
pip install querri
import os
from querri import Querri
client = Querri(
api_key=os.environ["QUERRI_API_KEY"],
org_id=os.environ["QUERRI_ORG_ID"],
)
for project in client.projects.list():
print(project.name)

Or let the SDK read from environment variables automatically:

Terminal window
export QUERRI_API_KEY="qk_live_..."
export QUERRI_ORG_ID="org_..."
from querri import Querri
client = Querri() # reads QUERRI_API_KEY and QUERRI_ORG_ID from env

The client supports context managers for automatic cleanup:

with Querri() as client:
users = client.users.list()

API keys start with qk_ and are scoped to a single organization. Generate one from Settings > API Keys in the Querri web app.

Every request requires both an API key and an organization ID. Pass them to the constructor or set the QUERRI_API_KEY and QUERRI_ORG_ID environment variables. Explicit arguments always override environment variables.

ParameterEnv VariableDefaultDescription
api_keyQUERRI_API_KEY(required)Your qk_ API key
org_idQUERRI_ORG_ID(required)Organization ID
hostQUERRI_HOSThttps://app.querri.comServer host
timeoutQUERRI_TIMEOUT30.0Request timeout (seconds)
max_retriesQUERRI_MAX_RETRIES3Retry attempts for transient errors
client = Querri(
api_key="qk_live_...",
org_id="org_...",
host="http://localhost", # for local development
timeout=60.0,
max_retries=5,
)

client.embed.get_session() is the flagship method. It combines user resolution, access policy application, and embed session creation into one call — the complete white-label embedding workflow.

session = client.embed.get_session(
user="customer-42",
ttl=7200,
)
print(session["session_token"])
session = client.embed.get_session(
user={
"external_id": "customer-42",
"email": "alice@customer.com",
"first_name": "Alice",
"last_name": "Smith",
"role": "member",
},
origin="https://app.customer.com",
)

Specify sources and filters directly. The SDK creates a deterministic auto-named policy (or reuses an existing one with the same spec):

session = client.embed.get_session(
user={
"external_id": "customer-42",
"email": "alice@customer.com",
},
access={
"sources": ["src_sales", "src_marketing"],
"filters": {
"region": ["APAC", "EMEA"],
"department": "Sales",
},
},
origin="https://app.customer.com",
ttl=3600,
)
session = client.embed.get_session(
user="customer-42",
access={"policy_ids": ["pol_abc", "pol_def"]},
)

get_session() returns a plain dict (not a Pydantic model):

{
"session_token": "es_...",
"expires_in": 3600,
"user_id": "usr_...",
"external_id": "customer-42",
}

Access policies control row-level security (RLS) — which data sources a user can see and which rows are visible.

policy = client.policies.create(
name="APAC Sales",
description="Restricts data to APAC region",
source_ids=["src_abc", "src_def"],
row_filters=[
{"column": "region", "values": ["APAC"]},
{"column": "department", "values": ["Sales", "Marketing"]},
],
)
client.policies.assign_users(policy.id, user_ids=[user.id])

Create a policy and assign users in one call, using a friendlier dict syntax for row filters:

policy = client.policies.setup(
name="APAC Sales Team",
sources=["src_abc", "src_def"],
row_filters={"region": ["APAC"], "department": "Sales"},
users=["usr_111", "usr_222"],
)
# List policies
policies = client.policies.list()
# Filter by name
policies = client.policies.list(name="APAC Sales")
# Get policy details
policy = client.policies.get("pol_...")
# Update
client.policies.update(policy.id, name="New Name")
# Remove a user
client.policies.remove_user(policy.id, "usr_...")
# Delete policy
client.policies.delete(policy.id)
# Discover filterable columns for a source
columns = client.policies.columns(source_id="src_abc")
# Preview resolved access (effective filters for a user + source)
resolved = client.policies.resolve(user_id="usr_...", source_id="src_abc")
  • Same column, multiple policies = OR. Policies for region = US and region = EU means the user sees US or EU rows.
  • Different columns = AND. Policies for region = US and department = Sales means only rows matching both.
  • No policies assigned = full access (permissive default).
# Create a user
user = client.users.create(
email="alice@example.com",
external_id="cust-42",
first_name="Alice",
last_name="Smith",
role="member", # "member" or "admin"
)
print(user.id, user.email)
# Get a user by ID
user = client.users.get("usr_...")
# Idempotent get-or-create by external ID
user = client.users.get_or_create(
external_id="cust-42",
email="alice@example.com",
first_name="Alice",
)
# List users (auto-paginates)
for user in client.users.list():
print(user.email)
# Filter by external ID
page = client.users.list(external_id="cust-42")
user = page.data[0]
# Update
updated = client.users.update(user.id, first_name="Alicia")
# Delete
client.users.delete(user.id)
# Create a source with inline JSON data
source = client.data.create_source(
name="Sales Data",
rows=[
{"region": "US", "revenue": 100000},
{"region": "EU", "revenue": 85000},
],
)
# List available data sources
sources = client.data.sources()
# Get source metadata and schema
source = client.data.source("src_...")
# Query a source with SQL (RLS-enforced)
result = client.data.query(
sql="SELECT region, SUM(revenue) FROM data GROUP BY region",
source_id="src_...",
page=1,
page_size=100,
)
print(result.data)
print(result.total_rows)
# Get paginated source data
page = client.data.source_data("src_...", page=1, page_size=100)
# Delete a source
client.data.delete_source("src_...")

Manage connectors via the Sources resource

Section titled “Manage connectors via the Sources resource”
# List available connector types
connectors = client.sources.list_connectors()
# Create a data source from a connector
source = client.sources.create(
name="My Postgres",
connector_id="connector-uuid",
config={"host": "db.example.com", "port": 5432},
)
# List, update, delete
sources = client.sources.list()
client.sources.update("src_...", name="Renamed Source")
client.sources.delete("src_...")
# Trigger a sync
client.sources.sync("src_...")

If you need lower-level session control beyond get_session():

# Create an embed session
session = client.embed.create_session(
user_id="usr_...",
origin="https://app.customer.com",
ttl=3600, # seconds (15 min to 24 hours)
)
print(session.session_token) # "es_..."
# Refresh before expiry (old token is revoked)
new_session = client.embed.refresh_session(
session_token=session.session_token,
)
# List active sessions
session_list = client.embed.list_sessions(limit=50)
for s in session_list.data:
print(s.session_token, s.user_id)
# Revoke a session
client.embed.revoke_session(session_token=session.session_token)
# List projects (auto-paginates)
for project in client.projects.list():
print(project.name)
# Create a project
project = client.projects.create(
name="Q4 Analysis",
user_id="usr_...",
description="Quarterly sales analysis",
)
# Submit for execution
run = client.projects.run(project.id, user_id="usr_...")
# Check execution status
status = client.projects.run_status(project.id)
# Get step result data (paginated, RLS-enforced)
data = client.projects.get_step_data(
project.id, step.id, page=1, page_size=100,
)
# List and manage dashboards
dashboards = client.dashboards.list()
dashboard = client.dashboards.create(name="Sales Overview")
client.dashboards.refresh(dashboard.id)

Stream AI responses within a project’s chat:

# Stream response chunk by chunk
stream = client.projects.chats.stream(
project.id,
chat.id,
prompt="Summarize the sales data by region",
user_id="usr_...",
)
for chunk in stream:
print(chunk, end="", flush=True)
# Or get the full response at once
full_text = stream.text()

The AsyncQuerri client mirrors the sync API with async/await:

import asyncio
from querri import AsyncQuerri
async def main():
async with AsyncQuerri() as client:
# Auto-paginate
async for project in client.projects.list():
print(project.name)
# Streaming
stream = await client.projects.chats.stream(
project_id, chat_id,
prompt="Summarize the data",
user_id="usr_...",
)
async for chunk in stream:
print(chunk, end="", flush=True)
# get_session works the same way
session = await client.embed.get_session(
user={"external_id": "cust-42", "email": "a@b.com"},
access={"sources": ["src_sales"]},
)
asyncio.run(main())

All API errors inherit from APIError and include structured attributes:

from querri import (
APIError,
AuthenticationError,
NotFoundError,
RateLimitError,
ValidationError,
ServerError,
)
try:
project = client.projects.get("nonexistent-id")
except NotFoundError as e:
print(f"Not found: {e.message} (status={e.status})")
except RateLimitError as e:
print(f"Rate limited — retry after {e.retry_after}s")
except AuthenticationError:
print("Invalid API key")
except ValidationError as e:
print(f"Bad request: {e.message}")
except ServerError as e:
print(f"Server error: {e.status}")
except APIError as e:
print(f"API error {e.status}: {e.message}")
print(f" type={e.type}, code={e.code}, request_id={e.request_id}")
QuerriError
├── APIError
│ ├── AuthenticationError (401)
│ ├── PermissionError (403)
│ ├── NotFoundError (404)
│ ├── ValidationError (400)
│ ├── ConflictError (409)
│ ├── RateLimitError (429) — has retry_after attribute
│ └── ServerError (500+)
├── StreamError
│ ├── StreamTimeoutError
│ └── StreamCancelledError
└── ConfigError

The SDK automatically retries on 429 and 5xx errors (up to max_retries, default 3).

A FastAPI route that creates a per-tenant embed session:

from fastapi import FastAPI, Depends
from querri import AsyncQuerri
app = FastAPI()
querri = AsyncQuerri() # reads from environment variables
@app.get("/api/querri-token")
async def get_querri_token(current_user=Depends(get_current_user)):
session = await querri.embed.get_session(
user={
"external_id": str(current_user.id),
"email": current_user.email,
"first_name": current_user.first_name,
"last_name": current_user.last_name,
},
access={
"sources": [os.environ["QUERRI_SOURCE_ID"]],
"filters": {"tenant_id": str(current_user.tenant_id)},
},
)
return {"sessionToken": session["session_token"]}

Each tenant sees only their rows. One dataset, one embed, automatic filtering. Pair this with the Embed SDK on the frontend to render the iframe.

  • API keys (qk_*) should only be used server-side — never expose them in client code or version control
  • Session tokens (es_*) are safe for the browser — they are scoped and time-limited
  • Set origin in get_session() to restrict which domains can use the session token
  • Use environment variables for credentials
  • Rotate API keys periodically via Settings > API Keys

The Python SDK wraps the Querri V1 API endpoints. For the complete endpoint reference, see API Reference.

SDK ResourceAPI Endpoints
client.users/v1/embed/users/*
client.embed/v1/embed/sessions/*
client.policies/v1/access/policies/*, /v1/access/resolve, /v1/access/columns
client.files/v1/files/*
client.projects/v1/projects/*
client.dashboards/v1/dashboards/*
client.data/v1/data/*
client.sharing/v1/projects/*/shares, /v1/dashboards/*/shares
client.sources/v1/sources/*, /v1/connectors
client.keys/v1/keys/*
client.audit/v1/audit/*
client.usage/v1/usage/*