Quickstart¶
This guide walks through initializing a Spark-on-Ray session, running a query, and creating an Iceberg table.
Basic Usage¶
"""DRLS Quickstart — Spark-on-Ray with Iceberg.
Run:
pip install drls
python examples/quickstart.py
"""
import ray
import drls
ray.init()
# Start a Spark-on-Ray cluster with a local Iceberg catalog
spark = drls.init_spark(
app_name="quickstart",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Basic Spark operations work as normal
df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}") # 100
# Create an Iceberg table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
CREATE TABLE IF NOT EXISTS drls.demo.numbers (
id BIGINT,
doubled BIGINT
) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()
# Query it back
result = spark.table("drls.demo.numbers")
print(f"Iceberg table count: {result.count()}") # 100
result.show(5)
# Convert Spark DataFrame to Ray Dataset
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}") # 100
# Convert back
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}") # 100
# Cleanup
drls.stop_spark()
ray.shutdown()
print("Quickstart complete!")
Step by Step¶
1. Initialize Ray
This starts a local Ray cluster. In production, connect to an existing cluster with ray.init(address="auto").
2. Create a Spark Session
import drls
spark = drls.init_spark(
app_name="quickstart",
num_executors=1,
executor_cores=1,
executor_memory="500M",
)
This launches Spark executors as Ray actors. Parameters:
| Parameter | Description |
|---|---|
app_name |
Spark application name |
num_executors |
Number of Spark executors (Ray actors) |
executor_cores |
CPU cores per executor |
executor_memory |
Memory per executor (e.g., "500M", "2G") |
3. Run Queries
Use the standard PySpark DataFrame and SQL APIs:
4. Clean Up
With Iceberg¶
Enable Iceberg by passing catalog configuration to init_spark:
spark = drls.init_spark(
"iceberg-demo", 1, 1, "500M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Create a table
spark.sql("""
CREATE TABLE drls.db.events (
id BIGINT,
ts TIMESTAMP,
payload STRING
) USING iceberg
""")
# Insert data
spark.sql("INSERT INTO drls.db.events VALUES (1, current_timestamp(), 'hello')")
# Query
spark.sql("SELECT * FROM drls.db.events").show()
See Iceberg Operations for compaction, schema evolution, time travel, and more.
Using the CLI¶
DRLS includes a CLI for common operations:
# List tables
drls tables --catalog hadoop --warehouse /tmp/warehouse
# Check table health
drls health drls.db.events --catalog hadoop --warehouse /tmp/warehouse
# Run compaction
drls compact drls.db.events --strategy binpack --catalog hadoop --warehouse /tmp/warehouse
See the CLI Guide for all commands.
Next Steps¶
- Python API — Full API documentation
- Streaming — Bidirectional Spark-Ray streaming bridge
- Agentic — AI-driven lakehouse management
- Management Console — React UI for visual management