REST API Reference¶
The Rust Axum backend exposes a REST API on port 3000. Python operations (catalog, health, table ops, agent) are called directly via PyO3 (embedded Python). Config and streaming pipelines are handled in pure Rust.
Base URL¶
Endpoints¶
Health¶
GET /api/health¶
Health check endpoint.
Response: 200 OK
Catalog¶
GET /api/tables¶
List all Iceberg tables.
Response:
GET /api/table/{table_name}/schema¶
Get table schema.
Response:
{
"success": true,
"table": "db.events",
"columns": [
{"name": "id", "type": "bigint", "nullable": false, "comment": null},
{"name": "ts", "type": "timestamp", "nullable": true, "comment": null}
]
}
GET /api/table/{table_name}/health¶
Get table health report.
Response:
{
"success": true,
"table": "db.events",
"snapshot_count": 15,
"data_file_count": 42,
"total_size_bytes": 1048576,
"avg_file_size_bytes": 24966,
"small_file_count": 30,
"recommendation": "Consider compaction"
}
Operations¶
POST /api/compact¶
Run table compaction.
Request:
Response:
{
"success": true,
"operation": "compact",
"table": "drls.db.events",
"details_json": "{\"rewritten_files\": 10, \"new_files\": 2}"
}
POST /api/expire¶
Expire snapshots.
Request:
POST /api/evolve/add-column¶
Add a column to a table.
Request:
{
"table": "drls.db.events",
"name": "source",
"type": "STRING",
"after": "ts",
"comment": "Event source"
}
POST /api/evolve/rename-column¶
Rename a column.
Request:
POST /api/evolve/drop-column¶
Drop a column.
Request:
Streaming Pipelines¶
POST /api/pipeline/start¶
Start a streaming pipeline.
Request:
{
"source_table": "drls.db.events",
"sink_table": "drls.db.output",
"checkpoint_location": "/tmp/checkpoint",
"trigger_interval": "10 seconds"
}
Response:
POST /api/pipeline/{pipeline_id}/stop¶
Stop a pipeline.
GET /api/pipeline/{pipeline_id}/status¶
Get pipeline status.
GET /api/pipelines¶
List all pipelines.
Response:
WebSocket¶
GET /api/pipeline/monitor¶
WebSocket endpoint for real-time pipeline metrics.
Message format (server → client):
Pipeline Definitions¶
POST /api/pipeline-defs¶
Create a new pipeline definition with an initial version in development.
Min role: DataEngineer
Request:
{
"name": "my-pipeline",
"description": "Event processing pipeline",
"nodes": [{"id": "n1", "type": "source", "config": {}, "position": {"x": 0, "y": 0}}],
"edges": []
}
Response: 201 Created
{
"definition": {"id": "def-...", "name": "my-pipeline", "description": "...", "created_by": "user-1", "created_at": "...", "updated_at": "..."},
"version": {"id": "ver-...", "definition_id": "def-...", "version": 1, "format_version": 1, "environment": "development", "...": "..."}
}
GET /api/pipeline-defs¶
List all pipeline definitions. Optionally filter by environment.
Min role: Auditor
Query params: ?environment=production (optional)
GET /api/pipeline-defs/{def_id}¶
Get a definition with the latest version per environment.
Min role: Auditor
PUT /api/pipeline-defs/{def_id}¶
Update definition name/description. Blocked if any version is in UAT, production, or retired.
Min role: DataEngineer
DELETE /api/pipeline-defs/{def_id}¶
Delete a definition. Only allowed when all versions are in development or QA.
Min role: DataEngineer
Pipeline Versions¶
POST /api/pipeline-defs/{def_id}/versions¶
Create a new version in development.
Min role: DataEngineer
Request:
{
"nodes": [{"id": "n1", "type": "source", "config": {}, "position": {"x": 0, "y": 0}}],
"edges": [{"id": "e1", "source": "n1", "target": "n2"}]
}
GET /api/pipeline-defs/{def_id}/versions¶
List all versions of a definition.
Min role: Auditor
GET /api/pipeline-defs/{def_id}/versions/{ver_id}¶
Get a specific version with full node/edge data.
Min role: Auditor
Environment Transitions¶
POST /api/pipeline-defs/{def_id}/versions/{ver_id}/promote¶
Promote a version to the next environment. Development → QA is direct. QA → UAT and UAT → Production create an approval request and return 202 Accepted.
Min role: DataEngineer
Response (direct): 200 OK
Response (approval required): 202 Accepted
POST /api/pipeline-defs/{def_id}/versions/{ver_id}/demote¶
Demote a version from UAT back to QA.
Min role: LeadDataEngineer
POST /api/pipeline-defs/{def_id}/versions/{ver_id}/retire¶
Retire a version from production.
Min role: LeadDataEngineer
Approvals¶
GET /api/approvals¶
List promotion requests. Optionally filter by status.
Min role: LeadDataEngineer
Query params: ?status=pending (optional — pending, approved, rejected)
GET /api/approvals/{req_id}¶
Get approval request details with version snapshot.
Min role: LeadDataEngineer
POST /api/approvals/{req_id}/approve¶
Approve a pending request. Moves the version to the target environment.
Min role: LeadDataEngineer
Request:
POST /api/approvals/{req_id}/reject¶
Reject a pending request. Reason is required.
Min role: LeadDataEngineer
Request:
GET /api/approvals/{req_id}/comments¶
List comments on an approval request.
Min role: DataEngineer
POST /api/approvals/{req_id}/comments¶
Add a comment to an approval request.
Min role: DataEngineer
Request:
Import / Export¶
GET /api/pipeline-defs/{def_id}/versions/{ver_id}/export¶
Export a version as JSON.
Min role: Auditor
Response:
{
"format_version": 1,
"name": "my-pipeline",
"description": "...",
"version": 3,
"nodes": [],
"edges": [],
"exported_at": "2026-02-19T..."
}
POST /api/pipeline-defs/import¶
Import a pipeline from JSON. Creates a new definition with version 1 in development. Rejects files with a format_version higher than the current system version.
Min role: DataEngineer
Request:
{
"format_version": 1,
"name": "imported-pipeline",
"description": "...",
"nodes": [],
"edges": []
}
Notifications¶
GET /api/notifications/pending-approvals¶
Get count and list of pending approval requests.
Min role: LeadDataEngineer
Response:
{
"count": 2,
"requests": [{"id": "pr-...", "version_id": "ver-...", "from_environment": "qa", "to_environment": "uat", "status": "pending", "...": "..."}]
}
Agent¶
POST /api/agent/chat¶
Send a prompt to TRex.
Request:
{
"prompt": "How healthy is the events table?",
"provider": "ollama",
"model": "llama3:8b",
"api_base": null
}
Response:
{
"success": true,
"response": "The events table has 15 snapshots and 42 data files...",
"tool_calls": [
{
"tool": "get_table_health",
"arguments_json": "{\"table\": \"drls.db.events\"}",
"result_json": "{\"snapshot_count\": 15, ...}"
}
]
}
Notebook Service¶
Notebook endpoints require a Kubernetes cluster. When no cluster is available, these endpoints are not registered.
POST /api/notebook/start¶
Create a marimo notebook pod and service for the authenticated user. Idempotent — returns the existing session if a pod is already running.
Min role: DataEngineer
Response: 200 OK
POST /api/notebook/stop¶
Delete the authenticated user's notebook pod and service.
Min role: DataEngineer
Response: 204 No Content
GET /api/notebook/status¶
Poll the authenticated user's notebook pod phase.
Min role: DataEngineer
Response:
Status values: not_found, pending, running, failed
ANY /notebook/{user_id}/*¶
Reverse proxy to the user's marimo pod. Handles both HTTP and WebSocket connections. The {user_id} in the path must match the authenticated user.
Configuration¶
GET /api/config¶
Get current catalog configuration.
Response:
{
"configured": true,
"catalog_type": "hadoop",
"warehouse": "/tmp/warehouse",
"catalog_name": "drls",
"uri": null,
"extra_props": {}
}
PUT /api/config¶
Set catalog configuration.
Request: