Skip to content

Python Package Architecture

The python/drls/ package contains the core runtime, Iceberg operations, streaming bridge, agentic layer, and server components.

Module Map

python/drls/
├── __init__.py              # Public API: init_spark, stop_spark, conversions
├── _version.py              # Package version
├── context.py               # Spark session lifecycle (init_spark, stop_spark)
├── cluster.py               # Ray cluster management
├── dataset.py               # DataFrame conversions (Spark ↔ Ray ↔ Pandas)
├── iceberg/                 # Iceberg operations
│   ├── __init__.py          # Re-exports all public functions
│   ├── catalog.py           # IcebergCatalogConfig, configure_iceberg
│   ├── table_ops.py         # compact_table, expire_snapshots, get_table_health, remove_orphan_files
│   ├── schema_evolution.py  # add_column, rename_column, drop_column, alter_column_type, get_schema
│   ├── partition_evolution.py  # add_partition_field, drop_partition_field, get_partition_spec
│   ├── time_travel.py       # snapshot_at, history, rollback_to
│   └── cdc.py               # read_cdc_stream, cdc_to_ray
├── streaming/               # Bidirectional Spark ↔ Ray bridge
│   ├── __init__.py          # from_spark_streaming, to_spark_streaming, create_partitioned_iterators
│   ├── coordinator.py       # StreamCoordinator (Ray actor, async, Arrow buffering)
│   ├── sink.py              # SparkStreamingSink, JvmStreamingSink (foreachBatch)
│   ├── consumer.py          # StreamingIterator (pull-based Arrow consumption)
│   ├── source.py            # Reverse bridge: _ProducerThread, _JvmBridgeThread
│   └── monitor.py           # print_stats (terminal-based monitoring)
├── agentic/                 # AI-driven management
│   ├── __init__.py          # Re-exports TRex
│   ├── agent.py             # TRex agent (litellm, multi-provider)
│   ├── tools.py             # 13 OpenAI-format tool definitions
│   ├── executor.py          # Tool execution dispatch
│   └── mcp_server.py        # MCP protocol server
├── grpc_server/             # gRPC server (optional, for external consumers)
│   ├── server.py            # grpcio server, registers all servicers
│   ├── codegen.py           # Generates stubs from proto
│   ├── generated/           # Auto-generated stubs (gitignored)
│   └── servicers/           # One per gRPC service
│       ├── catalog.py
│       ├── health.py
│       ├── operations.py
│       ├── streaming.py
│       ├── agent.py
│       └── config.py
├── cli.py                   # Click CLI (11 commands)
└── tests/                   # Unit tests

Key Entry Points

init_spark() — Session Lifecycle

The main entry point for all Spark operations. Creates a Spark session backed by Ray actors as executors.

spark = drls.init_spark(
    app_name="my-app",
    num_executors=2,
    executor_cores=1,
    executor_memory="1G",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/warehouse",
)

stop_spark() — Cleanup

Stops the Spark session and cleans up Ray resources.

DataFrame Conversions

# Spark → Ray
ray_ds = drls.spark_dataframe_to_ray_dataset(df)

# Ray → Spark
df = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)

Streaming Architecture

The streaming bridge uses a Ray actor (StreamCoordinator) as an intermediary buffer:

graph LR
    subgraph "Forward Bridge"
        SS["Spark Streaming"] --> Sink["SparkStreamingSink"]
        Sink --> SC["StreamCoordinator<br/>(Ray Actor)"]
        SC --> SI["StreamingIterator"]
        SI --> Ray["Ray Consumer"]
    end
graph LR
    subgraph "Reverse Bridge"
        RS["Ray Source"] --> PT["ProducerThread"]
        PT --> SC2["StreamCoordinator<br/>(Ray Actor)"]
        SC2 --> JB["JvmBridgeThread"]
        JB --> SS2["Spark Streaming DF"]
    end

The coordinator is a named Ray actor that handles backpressure, buffering, and partition routing. Data flows as Arrow tables for zero-copy efficiency.

Agentic Architecture

The agentic layer uses a tool-calling LLM pattern:

  1. User sends natural language prompt
  2. TRex sends prompt + 13 tool definitions to LLM (via litellm)
  3. LLM returns tool calls
  4. executor.py dispatches tool calls to Iceberg/Spark functions
  5. Results are returned to LLM for final response

Supported providers (via litellm): OpenAI, Anthropic, Google, Mistral, Ollama, vLLM, TGI, LM Studio.