Quickstart Walkthrough¶
File: examples/quickstart.py
This example demonstrates the basics of DRLS: creating a Spark-on-Ray session, running queries, creating Iceberg tables, and converting between Spark DataFrames and Ray Datasets.
Full Source¶
"""DRLS Quickstart — Spark-on-Ray with Iceberg.
Run:
pip install drls
python examples/quickstart.py
"""
import ray
import drls
ray.init()
# Start a Spark-on-Ray cluster with a local Iceberg catalog
spark = drls.init_spark(
app_name="quickstart",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Basic Spark operations work as normal
df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}") # 100
# Create an Iceberg table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
CREATE TABLE IF NOT EXISTS drls.demo.numbers (
id BIGINT,
doubled BIGINT
) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()
# Query it back
result = spark.table("drls.demo.numbers")
print(f"Iceberg table count: {result.count()}") # 100
result.show(5)
# Convert Spark DataFrame to Ray Dataset
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}") # 100
# Convert back
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}") # 100
# Cleanup
drls.stop_spark()
ray.shutdown()
print("Quickstart complete!")
Step-by-Step¶
Initialize Ray and Spark¶
ray.init()
spark = drls.init_spark(
app_name="quickstart",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
init_spark() launches Spark executors as Ray actors. The iceberg_catalog and iceberg_warehouse parameters configure a local Hadoop Iceberg catalog.
Create and Query Data¶
df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}") # 100
Standard PySpark operations work as expected — the Spark executors run inside Ray.
Create an Iceberg Table¶
spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
CREATE TABLE IF NOT EXISTS drls.demo.numbers (
id BIGINT, doubled BIGINT
) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()
The drls prefix is the catalog name configured in init_spark().
Convert Between Runtimes¶
# Spark → Ray
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}") # 100
# Ray → Spark
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}") # 100
These conversions use Arrow for efficient zero-copy transfer when possible.
Cleanup¶
Always call stop_spark() before ray.shutdown() to cleanly release Spark resources.