Agentic Layer¶
DRLS includes an AI-driven management layer that lets you operate your lakehouse using natural language. It supports any LLM provider (cloud or local) via litellm.
Setup¶
TRex¶
The TRex agent connects an LLM to 13 lakehouse tools:
from drls.agentic import TRex
agent = TRex(
spark,
provider="ollama", # LLM provider
model="llama3:70b", # Model name
api_base=None, # Custom API endpoint (optional)
api_key=None, # API key (optional, reads from env)
)
result = agent.run("How healthy is the events table?")
print(result["response"])
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
provider |
str |
"ollama" |
LLM provider name |
model |
str |
"llama3:70b" |
Model identifier |
api_base |
str \| None |
None |
Custom API base URL |
api_key |
str \| None |
None |
API key (or set via env var) |
Supported Providers¶
| Provider | Example Model | API Key Env Var |
|---|---|---|
ollama |
llama3:70b, llama3:8b |
— (local) |
openai |
gpt-4o, gpt-4o-mini |
OPENAI_API_KEY |
anthropic |
claude-sonnet-4-20250514 |
ANTHROPIC_API_KEY |
google |
gemini-pro |
GOOGLE_API_KEY |
mistral |
mistral-large-latest |
MISTRAL_API_KEY |
vllm |
any HF model | — (local) |
text-generation-inference |
any HF model | — (local) |
lm_studio |
any local model | — (local) |
Available Tools¶
The agent has access to 13 tools covering all lakehouse operations:
| Tool | Description |
|---|---|
list_tables |
List all Iceberg tables in the catalog |
get_table_health |
Analyze table health (file count, sizes, recommendations) |
get_schema |
Get the schema of a table |
compact_table |
Run file compaction (binpack/sort/zorder) |
expire_snapshots |
Expire old snapshots |
remove_orphan_files |
Remove orphan data files |
table_history |
Show snapshot history |
rollback_to |
Roll back to a previous snapshot |
add_column |
Add a new column |
rename_column |
Rename a column |
drop_column |
Drop a column |
add_partition_field |
Add a partition field |
run_sql |
Execute arbitrary SQL |
CLI Usage¶
# With Ollama (local, air-gapped)
drls agent "How healthy is the events table?" --provider ollama --model llama3:8b
# With OpenAI
drls agent "Compact all tables with small files" --provider openai --model gpt-4o
# With Anthropic
drls agent "What's the schema of the users table?" --provider anthropic --model claude-sonnet-4-20250514
MCP Server¶
For integration with MCP-compatible AI clients (e.g., Claude Desktop):
This exposes all 13 tools via the Model Context Protocol, allowing any MCP client to manage your lakehouse.
Example¶
"""TRex agent demo.
Demonstrates using TRex, the DRLS AI agent, to manage Iceberg tables
via natural language commands. Works with any LLM provider (cloud
or local via Ollama/vLLM).
Run with Ollama (air-gap):
pip install drls[agent]
ollama serve &
ollama pull llama3:8b
python examples/agent_demo.py --provider ollama --model llama3:8b
Run with OpenAI:
export OPENAI_API_KEY=sk-...
python examples/agent_demo.py --provider openai --model gpt-4o
Run with Anthropic:
export ANTHROPIC_API_KEY=sk-ant-...
python examples/agent_demo.py --provider anthropic --model claude-sonnet-4-20250514
"""
import argparse
import json
import ray
import drls
from drls.iceberg import compact_table, get_table_health
ray.init()
spark = drls.init_spark(
app_name="agent-demo",
num_executors=1,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Setup: create a table with some data
spark.sql("CREATE DATABASE IF NOT EXISTS drls.agent_demo")
spark.sql("DROP TABLE IF EXISTS drls.agent_demo.events")
spark.sql("""
CREATE TABLE drls.agent_demo.events (
id BIGINT,
event_type STRING,
payload STRING
) USING iceberg
""")
# Insert some data in multiple small batches to create fragmentation
for i in range(5):
spark.sql(f"""
INSERT INTO drls.agent_demo.events VALUES
({i*10 + 1}, 'click', '{{"page": "home"}}'),
({i*10 + 2}, 'view', '{{"page": "product"}}'),
({i*10 + 3}, 'purchase', '{{"amount": {i * 10 + 5}}}')
""")
# Check health manually first
print("--- Manual Health Check ---")
health = get_table_health(spark, "drls.agent_demo.events")
print(json.dumps(health, indent=2))
# Parse arguments for LLM provider
parser = argparse.ArgumentParser()
parser.add_argument("--provider", default="ollama", help="LLM provider")
parser.add_argument("--model", default="llama3:8b", help="Model name")
parser.add_argument("--api-base", default=None, help="API base URL")
args = parser.parse_args()
print(f"\n--- Agent Demo (provider={args.provider}, model={args.model}) ---")
try:
from drls.agentic import TRex
agent = TRex(
spark,
provider=args.provider,
model=args.model,
api_base=args.api_base,
)
# Ask the agent about table health
prompts = [
"How healthy is the events table in the agent_demo database?",
"How many tables are in the catalog?",
"What's the schema of agent_demo.events?",
]
for prompt in prompts:
print(f"\nUser: {prompt}")
result = agent.run(prompt)
if result.get("success"):
print(f"TRex: {result['response']}")
if result.get("tool_calls"):
print(f" (used {len(result['tool_calls'])} tool calls)")
else:
print(f"Error: {result.get('error')}")
except ImportError:
print("Agent dependencies not installed. Run: pip install drls[agent]")
except Exception as e:
print(f"Agent error (expected if no LLM available): {e}")
# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nAgent demo complete!")