Streaming Walkthrough¶
File: examples/streaming_iceberg.py
This example demonstrates the bidirectional streaming bridge between Spark Structured Streaming and Ray.
Full Source¶
"""Streaming bridge demo — Spark Structured Streaming to Ray via Iceberg.
Demonstrates the bidirectional streaming bridge:
1. Write to an Iceberg table via Spark
2. Stream from Iceberg CDC to Ray using from_spark_streaming()
3. Process micro-batches in Ray
4. Write results back to Iceberg via to_spark_streaming()
Run:
pip install drls
python examples/streaming_iceberg.py
"""
import ray
import drls
from drls.streaming import from_spark_streaming, to_spark_streaming
ray.init()
spark = drls.init_spark(
app_name="streaming-iceberg",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# --- Setup: create source data as a rate stream ---
rate_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
)
# --- Forward bridge: Spark → Ray ---
print("Starting Spark → Ray bridge...")
coordinator, query = from_spark_streaming(
rate_stream,
coordinator_name="demo-stream",
max_batch_size=50,
buffer_size=5,
)
# Consume batches in Ray
import pyarrow as pa
batch_count = 0
total_rows = 0
for batch in coordinator:
if isinstance(batch, pa.RecordBatch):
total_rows += batch.num_rows
batch_count += 1
print(f" Batch {batch_count}: {batch.num_rows} rows, schema={batch.schema}")
if batch_count >= 5:
break
print(f"Consumed {total_rows} rows across {batch_count} batches")
# Stop the forward bridge
query.stop()
# --- Reverse bridge: Ray → Spark ---
print("\nStarting Ray → Spark bridge...")
ray_ds = ray.data.range(100)
reverse_query = to_spark_streaming(
spark,
ray_ds,
source_name="demo-reverse",
format="console", # just print to console for demo
options={"truncate": "false", "numRows": "5"},
trigger="once",
)
# Wait for the single trigger to complete
reverse_query.awaitTermination(timeout=30)
print("Reverse bridge complete!")
# Cleanup
drls.stop_spark()
ray.shutdown()
print("Streaming demo complete!")
Step-by-Step¶
Forward Bridge: Spark to Ray¶
The forward bridge streams data from Spark Structured Streaming into Ray using Arrow tables:
from drls.streaming import from_spark_streaming
# Create a Spark rate stream as the source
rate_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
)
# Bridge to Ray
coordinator, query = from_spark_streaming(
rate_stream,
coordinator_name="demo-stream",
max_batch_size=50,
buffer_size=5,
)
from_spark_streaming() returns:
- coordinator — A
StreamingIteratorthat yieldspyarrow.RecordBatchobjects - query — The Spark
StreamingQueryhandle for lifecycle management
Consuming Batches¶
for batch in coordinator:
if isinstance(batch, pa.RecordBatch):
total_rows += batch.num_rows
batch_count += 1
if batch_count >= 5:
break
Each batch is a PyArrow RecordBatch, making it efficient to process in Ray or pass to ML frameworks.
Reverse Bridge: Ray to Spark¶
The reverse bridge streams data from Ray into a Spark Structured Streaming DataFrame:
from drls.streaming import to_spark_streaming
ray_ds = ray.data.range(100)
reverse_query = to_spark_streaming(
spark, ray_ds,
source_name="demo-reverse",
format="console",
options={"truncate": "false", "numRows": "5"},
trigger="once",
)
reverse_query.awaitTermination(timeout=30)
The reverse bridge is useful for writing Ray-processed data back into Iceberg tables.
Key Concepts¶
- StreamCoordinator — A Ray actor that buffers Arrow tables between producer (Spark) and consumer (Ray)
- Backpressure — The coordinator enforces buffer limits to prevent memory exhaustion
- Zero-copy — Data flows as Arrow tables for efficient cross-runtime transfer