Skip to content

Agentic Layer

DRLS includes an AI-driven management layer that lets you operate your lakehouse using natural language. It supports any LLM provider (cloud or local) via litellm.

Setup

pip install "drls[agent]"

TRex

The TRex agent connects an LLM to 13 lakehouse tools:

from drls.agentic import TRex

agent = TRex(
    spark,
    provider="ollama",      # LLM provider
    model="llama3:70b",     # Model name
    api_base=None,          # Custom API endpoint (optional)
    api_key=None,           # API key (optional, reads from env)
)

result = agent.run("How healthy is the events table?")
print(result["response"])

Parameters:

Parameter Type Default Description
spark SparkSession Active Spark session
provider str "ollama" LLM provider name
model str "llama3:70b" Model identifier
api_base str \| None None Custom API base URL
api_key str \| None None API key (or set via env var)

Supported Providers

Provider Example Model API Key Env Var
ollama llama3:70b, llama3:8b — (local)
openai gpt-4o, gpt-4o-mini OPENAI_API_KEY
anthropic claude-sonnet-4-20250514 ANTHROPIC_API_KEY
google gemini-pro GOOGLE_API_KEY
mistral mistral-large-latest MISTRAL_API_KEY
vllm any HF model — (local)
text-generation-inference any HF model — (local)
lm_studio any local model — (local)

Available Tools

The agent has access to 13 tools covering all lakehouse operations:

Tool Description
list_tables List all Iceberg tables in the catalog
get_table_health Analyze table health (file count, sizes, recommendations)
get_schema Get the schema of a table
compact_table Run file compaction (binpack/sort/zorder)
expire_snapshots Expire old snapshots
remove_orphan_files Remove orphan data files
table_history Show snapshot history
rollback_to Roll back to a previous snapshot
add_column Add a new column
rename_column Rename a column
drop_column Drop a column
add_partition_field Add a partition field
run_sql Execute arbitrary SQL

CLI Usage

# With Ollama (local, air-gapped)
drls agent "How healthy is the events table?" --provider ollama --model llama3:8b

# With OpenAI
drls agent "Compact all tables with small files" --provider openai --model gpt-4o

# With Anthropic
drls agent "What's the schema of the users table?" --provider anthropic --model claude-sonnet-4-20250514

MCP Server

For integration with MCP-compatible AI clients (e.g., Claude Desktop):

drls mcp-server --host 127.0.0.1 --port 8100

This exposes all 13 tools via the Model Context Protocol, allowing any MCP client to manage your lakehouse.

Example

"""TRex agent demo.

Demonstrates using TRex, the DRLS AI agent, to manage Iceberg tables
via natural language commands. Works with any LLM provider (cloud
or local via Ollama/vLLM).

Run with Ollama (air-gap):
    pip install drls[agent]
    ollama serve &
    ollama pull llama3:8b
    python examples/agent_demo.py --provider ollama --model llama3:8b

Run with OpenAI:
    export OPENAI_API_KEY=sk-...
    python examples/agent_demo.py --provider openai --model gpt-4o

Run with Anthropic:
    export ANTHROPIC_API_KEY=sk-ant-...
    python examples/agent_demo.py --provider anthropic --model claude-sonnet-4-20250514
"""
import argparse
import json
import ray
import drls
from drls.iceberg import compact_table, get_table_health

ray.init()

spark = drls.init_spark(
    app_name="agent-demo",
    num_executors=1,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Setup: create a table with some data
spark.sql("CREATE DATABASE IF NOT EXISTS drls.agent_demo")
spark.sql("DROP TABLE IF EXISTS drls.agent_demo.events")
spark.sql("""
    CREATE TABLE drls.agent_demo.events (
        id BIGINT,
        event_type STRING,
        payload STRING
    ) USING iceberg
""")

# Insert some data in multiple small batches to create fragmentation
for i in range(5):
    spark.sql(f"""
        INSERT INTO drls.agent_demo.events VALUES
        ({i*10 + 1}, 'click', '{{"page": "home"}}'),
        ({i*10 + 2}, 'view', '{{"page": "product"}}'),
        ({i*10 + 3}, 'purchase', '{{"amount": {i * 10 + 5}}}')
    """)

# Check health manually first
print("--- Manual Health Check ---")
health = get_table_health(spark, "drls.agent_demo.events")
print(json.dumps(health, indent=2))

# Parse arguments for LLM provider
parser = argparse.ArgumentParser()
parser.add_argument("--provider", default="ollama", help="LLM provider")
parser.add_argument("--model", default="llama3:8b", help="Model name")
parser.add_argument("--api-base", default=None, help="API base URL")
args = parser.parse_args()

print(f"\n--- Agent Demo (provider={args.provider}, model={args.model}) ---")

try:
    from drls.agentic import TRex

    agent = TRex(
        spark,
        provider=args.provider,
        model=args.model,
        api_base=args.api_base,
    )

    # Ask the agent about table health
    prompts = [
        "How healthy is the events table in the agent_demo database?",
        "How many tables are in the catalog?",
        "What's the schema of agent_demo.events?",
    ]

    for prompt in prompts:
        print(f"\nUser: {prompt}")
        result = agent.run(prompt)
        if result.get("success"):
            print(f"TRex: {result['response']}")
            if result.get("tool_calls"):
                print(f"  (used {len(result['tool_calls'])} tool calls)")
        else:
            print(f"Error: {result.get('error')}")

except ImportError:
    print("Agent dependencies not installed. Run: pip install drls[agent]")
except Exception as e:
    print(f"Agent error (expected if no LLM available): {e}")

# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nAgent demo complete!")