Python Package Architecture¶
The python/drls/ package contains the core runtime, Iceberg operations, streaming bridge, agentic layer, and server components.
Module Map¶
python/drls/
├── __init__.py # Public API: init_spark, stop_spark, conversions
├── _version.py # Package version
├── context.py # Spark session lifecycle (init_spark, stop_spark)
├── cluster.py # Ray cluster management
├── dataset.py # DataFrame conversions (Spark ↔ Ray ↔ Pandas)
│
├── iceberg/ # Iceberg operations
│ ├── __init__.py # Re-exports all public functions
│ ├── catalog.py # IcebergCatalogConfig, configure_iceberg
│ ├── table_ops.py # compact_table, expire_snapshots, get_table_health, remove_orphan_files
│ ├── schema_evolution.py # add_column, rename_column, drop_column, alter_column_type, get_schema
│ ├── partition_evolution.py # add_partition_field, drop_partition_field, get_partition_spec
│ ├── time_travel.py # snapshot_at, history, rollback_to
│ └── cdc.py # read_cdc_stream, cdc_to_ray
│
├── streaming/ # Bidirectional Spark ↔ Ray bridge
│ ├── __init__.py # from_spark_streaming, to_spark_streaming, create_partitioned_iterators
│ ├── coordinator.py # StreamCoordinator (Ray actor, async, Arrow buffering)
│ ├── sink.py # SparkStreamingSink, JvmStreamingSink (foreachBatch)
│ ├── consumer.py # StreamingIterator (pull-based Arrow consumption)
│ ├── source.py # Reverse bridge: _ProducerThread, _JvmBridgeThread
│ └── monitor.py # print_stats (terminal-based monitoring)
│
├── agentic/ # AI-driven management
│ ├── __init__.py # Re-exports TRex
│ ├── agent.py # TRex agent (litellm, multi-provider)
│ ├── tools.py # 13 OpenAI-format tool definitions
│ ├── executor.py # Tool execution dispatch
│ └── mcp_server.py # MCP protocol server
│
├── grpc_server/ # gRPC server (optional, for external consumers)
│ ├── server.py # grpcio server, registers all servicers
│ ├── codegen.py # Generates stubs from proto
│ ├── generated/ # Auto-generated stubs (gitignored)
│ └── servicers/ # One per gRPC service
│ ├── catalog.py
│ ├── health.py
│ ├── operations.py
│ ├── streaming.py
│ ├── agent.py
│ └── config.py
│
├── cli.py # Click CLI (11 commands)
└── tests/ # Unit tests
Key Entry Points¶
init_spark() — Session Lifecycle¶
The main entry point for all Spark operations. Creates a Spark session backed by Ray actors as executors.
spark = drls.init_spark(
app_name="my-app",
num_executors=2,
executor_cores=1,
executor_memory="1G",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/warehouse",
)
stop_spark() — Cleanup¶
Stops the Spark session and cleans up Ray resources.
DataFrame Conversions¶
# Spark → Ray
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
# Ray → Spark
df = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
Streaming Architecture¶
The streaming bridge uses a Ray actor (StreamCoordinator) as an intermediary buffer:
graph LR
subgraph "Forward Bridge"
SS["Spark Streaming"] --> Sink["SparkStreamingSink"]
Sink --> SC["StreamCoordinator<br/>(Ray Actor)"]
SC --> SI["StreamingIterator"]
SI --> Ray["Ray Consumer"]
end
graph LR
subgraph "Reverse Bridge"
RS["Ray Source"] --> PT["ProducerThread"]
PT --> SC2["StreamCoordinator<br/>(Ray Actor)"]
SC2 --> JB["JvmBridgeThread"]
JB --> SS2["Spark Streaming DF"]
end
The coordinator is a named Ray actor that handles backpressure, buffering, and partition routing. Data flows as Arrow tables for zero-copy efficiency.
Agentic Architecture¶
The agentic layer uses a tool-calling LLM pattern:
- User sends natural language prompt
TRexsends prompt + 13 tool definitions to LLM (via litellm)- LLM returns tool calls
executor.pydispatches tool calls to Iceberg/Spark functions- Results are returned to LLM for final response
Supported providers (via litellm): OpenAI, Anthropic, Google, Mistral, Ollama, vLLM, TGI, LM Studio.