Skip to content

Streaming Walkthrough

File: examples/streaming_iceberg.py

This example demonstrates the bidirectional streaming bridge between Spark Structured Streaming and Ray.

Full Source

"""Streaming bridge demo — Spark Structured Streaming to Ray via Iceberg.

Demonstrates the bidirectional streaming bridge:
1. Write to an Iceberg table via Spark
2. Stream from Iceberg CDC to Ray using from_spark_streaming()
3. Process micro-batches in Ray
4. Write results back to Iceberg via to_spark_streaming()

Run:
    pip install drls
    python examples/streaming_iceberg.py
"""
import ray
import drls
from drls.streaming import from_spark_streaming, to_spark_streaming

ray.init()

spark = drls.init_spark(
    app_name="streaming-iceberg",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# --- Setup: create source data as a rate stream ---
rate_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()
)

# --- Forward bridge: Spark → Ray ---
print("Starting Spark → Ray bridge...")
coordinator, query = from_spark_streaming(
    rate_stream,
    coordinator_name="demo-stream",
    max_batch_size=50,
    buffer_size=5,
)

# Consume batches in Ray
import pyarrow as pa

batch_count = 0
total_rows = 0
for batch in coordinator:
    if isinstance(batch, pa.RecordBatch):
        total_rows += batch.num_rows
        batch_count += 1
        print(f"  Batch {batch_count}: {batch.num_rows} rows, schema={batch.schema}")
    if batch_count >= 5:
        break

print(f"Consumed {total_rows} rows across {batch_count} batches")

# Stop the forward bridge
query.stop()

# --- Reverse bridge: Ray → Spark ---
print("\nStarting Ray → Spark bridge...")
ray_ds = ray.data.range(100)

reverse_query = to_spark_streaming(
    spark,
    ray_ds,
    source_name="demo-reverse",
    format="console",  # just print to console for demo
    options={"truncate": "false", "numRows": "5"},
    trigger="once",
)

# Wait for the single trigger to complete
reverse_query.awaitTermination(timeout=30)
print("Reverse bridge complete!")

# Cleanup
drls.stop_spark()
ray.shutdown()
print("Streaming demo complete!")

Step-by-Step

Forward Bridge: Spark to Ray

The forward bridge streams data from Spark Structured Streaming into Ray using Arrow tables:

from drls.streaming import from_spark_streaming

# Create a Spark rate stream as the source
rate_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()
)

# Bridge to Ray
coordinator, query = from_spark_streaming(
    rate_stream,
    coordinator_name="demo-stream",
    max_batch_size=50,
    buffer_size=5,
)

from_spark_streaming() returns:

  • coordinator — A StreamingIterator that yields pyarrow.RecordBatch objects
  • query — The Spark StreamingQuery handle for lifecycle management

Consuming Batches

for batch in coordinator:
    if isinstance(batch, pa.RecordBatch):
        total_rows += batch.num_rows
        batch_count += 1
    if batch_count >= 5:
        break

Each batch is a PyArrow RecordBatch, making it efficient to process in Ray or pass to ML frameworks.

Reverse Bridge: Ray to Spark

The reverse bridge streams data from Ray into a Spark Structured Streaming DataFrame:

from drls.streaming import to_spark_streaming

ray_ds = ray.data.range(100)
reverse_query = to_spark_streaming(
    spark, ray_ds,
    source_name="demo-reverse",
    format="console",
    options={"truncate": "false", "numRows": "5"},
    trigger="once",
)
reverse_query.awaitTermination(timeout=30)

The reverse bridge is useful for writing Ray-processed data back into Iceberg tables.

Key Concepts

  • StreamCoordinator — A Ray actor that buffers Arrow tables between producer (Spark) and consumer (Ray)
  • Backpressure — The coordinator enforces buffer limits to prevent memory exhaustion
  • Zero-copy — Data flows as Arrow tables for efficient cross-runtime transfer