Skip to content

Quickstart

This guide walks through initializing a Spark-on-Ray session, running a query, and creating an Iceberg table.

Basic Usage

"""DRLS Quickstart — Spark-on-Ray with Iceberg.

Run:
    pip install drls
    python examples/quickstart.py
"""
import ray
import drls

ray.init()

# Start a Spark-on-Ray cluster with a local Iceberg catalog
spark = drls.init_spark(
    app_name="quickstart",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Basic Spark operations work as normal
df = spark.range(100).toDF("id")
df = df.withColumn("doubled", df["id"] * 2)
print(f"Row count: {df.count()}")  # 100

# Create an Iceberg table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.demo")
spark.sql("""
    CREATE TABLE IF NOT EXISTS drls.demo.numbers (
        id BIGINT,
        doubled BIGINT
    ) USING iceberg
""")
df.writeTo("drls.demo.numbers").overwritePartitions()

# Query it back
result = spark.table("drls.demo.numbers")
print(f"Iceberg table count: {result.count()}")  # 100
result.show(5)

# Convert Spark DataFrame to Ray Dataset
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(f"Ray Dataset count: {ray_ds.count()}")  # 100

# Convert back
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
print(f"Round-trip count: {df2.count()}")  # 100

# Cleanup
drls.stop_spark()
ray.shutdown()
print("Quickstart complete!")

Step by Step

1. Initialize Ray

import ray
ray.init()

This starts a local Ray cluster. In production, connect to an existing cluster with ray.init(address="auto").

2. Create a Spark Session

import drls

spark = drls.init_spark(
    app_name="quickstart",
    num_executors=1,
    executor_cores=1,
    executor_memory="500M",
)

This launches Spark executors as Ray actors. Parameters:

Parameter Description
app_name Spark application name
num_executors Number of Spark executors (Ray actors)
executor_cores CPU cores per executor
executor_memory Memory per executor (e.g., "500M", "2G")

3. Run Queries

Use the standard PySpark DataFrame and SQL APIs:

df = spark.range(10)
print(df.count())  # 10

4. Clean Up

drls.stop_spark()
ray.shutdown()

With Iceberg

Enable Iceberg by passing catalog configuration to init_spark:

spark = drls.init_spark(
    "iceberg-demo", 1, 1, "500M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Create a table
spark.sql("""
    CREATE TABLE drls.db.events (
        id BIGINT,
        ts TIMESTAMP,
        payload STRING
    ) USING iceberg
""")

# Insert data
spark.sql("INSERT INTO drls.db.events VALUES (1, current_timestamp(), 'hello')")

# Query
spark.sql("SELECT * FROM drls.db.events").show()

See Iceberg Operations for compaction, schema evolution, time travel, and more.

Using the CLI

DRLS includes a CLI for common operations:

# List tables
drls tables --catalog hadoop --warehouse /tmp/warehouse

# Check table health
drls health drls.db.events --catalog hadoop --warehouse /tmp/warehouse

# Run compaction
drls compact drls.db.events --strategy binpack --catalog hadoop --warehouse /tmp/warehouse

See the CLI Guide for all commands.

Next Steps