Skip to content

REST API Reference

The Rust Axum backend exposes a REST API on port 3000. Python operations (catalog, health, table ops, agent) are called directly via PyO3 (embedded Python). Config and streaming pipelines are handled in pure Rust.

Base URL

http://localhost:3000/api

Endpoints

Health

GET /api/health

Health check endpoint.

Response: 200 OK


Catalog

GET /api/tables

List all Iceberg tables.

Response:

{
  "namespaces": ["db", "analytics"],
  "tables": ["db.events", "db.users", "analytics.metrics"]
}

GET /api/table/{table_name}/schema

Get table schema.

Response:

{
  "success": true,
  "table": "db.events",
  "columns": [
    {"name": "id", "type": "bigint", "nullable": false, "comment": null},
    {"name": "ts", "type": "timestamp", "nullable": true, "comment": null}
  ]
}

GET /api/table/{table_name}/health

Get table health report.

Response:

{
  "success": true,
  "table": "db.events",
  "snapshot_count": 15,
  "data_file_count": 42,
  "total_size_bytes": 1048576,
  "avg_file_size_bytes": 24966,
  "small_file_count": 30,
  "recommendation": "Consider compaction"
}

Operations

POST /api/compact

Run table compaction.

Request:

{
  "table": "drls.db.events",
  "strategy": "binpack",
  "options": {}
}

Response:

{
  "success": true,
  "operation": "compact",
  "table": "drls.db.events",
  "details_json": "{\"rewritten_files\": 10, \"new_files\": 2}"
}

POST /api/expire

Expire snapshots.

Request:

{
  "table": "drls.db.events",
  "retain_last": 5,
  "older_than": null
}

POST /api/evolve/add-column

Add a column to a table.

Request:

{
  "table": "drls.db.events",
  "name": "source",
  "type": "STRING",
  "after": "ts",
  "comment": "Event source"
}

POST /api/evolve/rename-column

Rename a column.

Request:

{
  "table": "drls.db.events",
  "old_name": "payload",
  "new_name": "data"
}

POST /api/evolve/drop-column

Drop a column.

Request:

{
  "table": "drls.db.events",
  "name": "source"
}

Streaming Pipelines

POST /api/pipeline/start

Start a streaming pipeline.

Request:

{
  "source_table": "drls.db.events",
  "sink_table": "drls.db.output",
  "checkpoint_location": "/tmp/checkpoint",
  "trigger_interval": "10 seconds"
}

Response:

{
  "pipeline_id": "abc-123",
  "status": "running",
  "batches_processed": 0,
  "rows_processed": 0
}

POST /api/pipeline/{pipeline_id}/stop

Stop a pipeline.

GET /api/pipeline/{pipeline_id}/status

Get pipeline status.

GET /api/pipelines

List all pipelines.

Response:

{
  "pipelines": [
    {"id": "abc-123", "status": "running", "source": "drls.db.events"}
  ]
}

WebSocket

GET /api/pipeline/monitor

WebSocket endpoint for real-time pipeline metrics.

Message format (server → client):

{
  "pipeline_id": "abc-123",
  "status": "running",
  "batches_processed": 42,
  "rows_processed": 10500
}

Pipeline Definitions

POST /api/pipeline-defs

Create a new pipeline definition with an initial version in development.

Min role: DataEngineer

Request:

{
  "name": "my-pipeline",
  "description": "Event processing pipeline",
  "nodes": [{"id": "n1", "type": "source", "config": {}, "position": {"x": 0, "y": 0}}],
  "edges": []
}

Response: 201 Created

{
  "definition": {"id": "def-...", "name": "my-pipeline", "description": "...", "created_by": "user-1", "created_at": "...", "updated_at": "..."},
  "version": {"id": "ver-...", "definition_id": "def-...", "version": 1, "format_version": 1, "environment": "development", "...": "..."}
}

GET /api/pipeline-defs

List all pipeline definitions. Optionally filter by environment.

Min role: Auditor

Query params: ?environment=production (optional)

GET /api/pipeline-defs/{def_id}

Get a definition with the latest version per environment.

Min role: Auditor

PUT /api/pipeline-defs/{def_id}

Update definition name/description. Blocked if any version is in UAT, production, or retired.

Min role: DataEngineer

DELETE /api/pipeline-defs/{def_id}

Delete a definition. Only allowed when all versions are in development or QA.

Min role: DataEngineer


Pipeline Versions

POST /api/pipeline-defs/{def_id}/versions

Create a new version in development.

Min role: DataEngineer

Request:

{
  "nodes": [{"id": "n1", "type": "source", "config": {}, "position": {"x": 0, "y": 0}}],
  "edges": [{"id": "e1", "source": "n1", "target": "n2"}]
}

GET /api/pipeline-defs/{def_id}/versions

List all versions of a definition.

Min role: Auditor

GET /api/pipeline-defs/{def_id}/versions/{ver_id}

Get a specific version with full node/edge data.

Min role: Auditor


Environment Transitions

POST /api/pipeline-defs/{def_id}/versions/{ver_id}/promote

Promote a version to the next environment. Development → QA is direct. QA → UAT and UAT → Production create an approval request and return 202 Accepted.

Min role: DataEngineer

Response (direct): 200 OK

{"status": "promoted", "environment": "qa", "approval_request_id": null}

Response (approval required): 202 Accepted

{"status": "pending_approval", "environment": "qa", "approval_request_id": "pr-..."}

POST /api/pipeline-defs/{def_id}/versions/{ver_id}/demote

Demote a version from UAT back to QA.

Min role: LeadDataEngineer

POST /api/pipeline-defs/{def_id}/versions/{ver_id}/retire

Retire a version from production.

Min role: LeadDataEngineer


Approvals

GET /api/approvals

List promotion requests. Optionally filter by status.

Min role: LeadDataEngineer

Query params: ?status=pending (optional — pending, approved, rejected)

GET /api/approvals/{req_id}

Get approval request details with version snapshot.

Min role: LeadDataEngineer

POST /api/approvals/{req_id}/approve

Approve a pending request. Moves the version to the target environment.

Min role: LeadDataEngineer

Request:

{"reason": "Looks good"}

POST /api/approvals/{req_id}/reject

Reject a pending request. Reason is required.

Min role: LeadDataEngineer

Request:

{"reason": "Needs fix in transform node config"}

GET /api/approvals/{req_id}/comments

List comments on an approval request.

Min role: DataEngineer

POST /api/approvals/{req_id}/comments

Add a comment to an approval request.

Min role: DataEngineer

Request:

{"content": "Updated the sink config per review feedback"}

Import / Export

GET /api/pipeline-defs/{def_id}/versions/{ver_id}/export

Export a version as JSON.

Min role: Auditor

Response:

{
  "format_version": 1,
  "name": "my-pipeline",
  "description": "...",
  "version": 3,
  "nodes": [],
  "edges": [],
  "exported_at": "2026-02-19T..."
}

POST /api/pipeline-defs/import

Import a pipeline from JSON. Creates a new definition with version 1 in development. Rejects files with a format_version higher than the current system version.

Min role: DataEngineer

Request:

{
  "format_version": 1,
  "name": "imported-pipeline",
  "description": "...",
  "nodes": [],
  "edges": []
}

Notifications

GET /api/notifications/pending-approvals

Get count and list of pending approval requests.

Min role: LeadDataEngineer

Response:

{
  "count": 2,
  "requests": [{"id": "pr-...", "version_id": "ver-...", "from_environment": "qa", "to_environment": "uat", "status": "pending", "...": "..."}]
}

Agent

POST /api/agent/chat

Send a prompt to TRex.

Request:

{
  "prompt": "How healthy is the events table?",
  "provider": "ollama",
  "model": "llama3:8b",
  "api_base": null
}

Response:

{
  "success": true,
  "response": "The events table has 15 snapshots and 42 data files...",
  "tool_calls": [
    {
      "tool": "get_table_health",
      "arguments_json": "{\"table\": \"drls.db.events\"}",
      "result_json": "{\"snapshot_count\": 15, ...}"
    }
  ]
}

Notebook Service

Notebook endpoints require a Kubernetes cluster. When no cluster is available, these endpoints are not registered.

POST /api/notebook/start

Create a marimo notebook pod and service for the authenticated user. Idempotent — returns the existing session if a pod is already running.

Min role: DataEngineer

Response: 200 OK

{
  "status": "pending",
  "url": "/notebook/user-123/",
  "session_id": "marimo-user-123"
}

POST /api/notebook/stop

Delete the authenticated user's notebook pod and service.

Min role: DataEngineer

Response: 204 No Content

GET /api/notebook/status

Poll the authenticated user's notebook pod phase.

Min role: DataEngineer

Response:

{
  "status": "running",
  "url": "/notebook/user-123/"
}

Status values: not_found, pending, running, failed

ANY /notebook/{user_id}/*

Reverse proxy to the user's marimo pod. Handles both HTTP and WebSocket connections. The {user_id} in the path must match the authenticated user.


Configuration

GET /api/config

Get current catalog configuration.

Response:

{
  "configured": true,
  "catalog_type": "hadoop",
  "warehouse": "/tmp/warehouse",
  "catalog_name": "drls",
  "uri": null,
  "extra_props": {}
}

PUT /api/config

Set catalog configuration.

Request:

{
  "catalog_type": "hadoop",
  "warehouse": "/tmp/warehouse",
  "catalog_name": "drls",
  "uri": null,
  "extra_props": {}
}