Skip to content

Quickstart Walkthrough

File: examples/quickstart.py

This example demonstrates the basics of DRLS: creating a Spark-on-Ray session, running queries, creating Iceberg tables, and converting between Spark DataFrames and Ray Datasets.

Full Source

"""DRLS Quickstart — Spark-on-Ray with Iceberg.

Run:
    pip install drls
    python examples/quickstart.py
"""
import ray
import drls

ray.init()

# Start a Spark-on-Ray cluster with a local Iceberg catalog
spark = drls.init_spark(
    app_name="quickstart",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Basic Spark operations work as normal
df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}")  # 100

# Create an Iceberg table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
    CREATE TABLE IF NOT EXISTS drls.demo.numbers (
        id BIGINT,
        doubled BIGINT
    ) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()

# Query it back
result = spark.table("drls.demo.numbers")
print(f"Iceberg table count: {result.count()}")  # 100
result.show(5)

# Convert Spark DataFrame to Ray Dataset
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}")  # 100

# Convert back
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}")  # 100

# Cleanup
drls.stop_spark()
ray.shutdown()
print("Quickstart complete!")

Step-by-Step

Initialize Ray and Spark

ray.init()
spark = drls.init_spark(
    app_name="quickstart",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

init_spark() launches Spark executors as Ray actors. The iceberg_catalog and iceberg_warehouse parameters configure a local Hadoop Iceberg catalog.

Create and Query Data

df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}")  # 100

Standard PySpark operations work as expected — the Spark executors run inside Ray.

Create an Iceberg Table

spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
    CREATE TABLE IF NOT EXISTS drls.demo.numbers (
        id BIGINT, doubled BIGINT
    ) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()

The drls prefix is the catalog name configured in init_spark().

Convert Between Runtimes

# Spark → Ray
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}")  # 100

# Ray → Spark
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}")  # 100

These conversions use Arrow for efficient zero-copy transfer when possible.

Cleanup

drls.stop_spark()
ray.shutdown()

Always call stop_spark() before ray.shutdown() to cleanly release Spark resources.