Streaming Bridge¶
DRLS provides a bidirectional streaming bridge between Spark Structured Streaming and Ray, using Arrow tables for zero-copy data transfer.
Architecture¶
graph LR
subgraph "Forward: Spark → Ray"
SS["Spark Streaming"] -->|foreachBatch| Sink["SparkStreamingSink"]
Sink -->|Arrow tables| SC["StreamCoordinator<br/>(Ray Actor)"]
SC -->|pull| SI["StreamingIterator"]
end
graph LR
subgraph "Reverse: Ray → Spark"
RS["Ray Source"] --> PT["ProducerThread"]
PT -->|Arrow tables| SC2["StreamCoordinator<br/>(Ray Actor)"]
SC2 --> JB["JvmBridgeThread"]
JB -->|IPC| SS2["Spark Streaming DF"]
end
The StreamCoordinator is a named Ray actor that handles buffering, backpressure, and partition routing.
Forward Bridge: Spark to Ray¶
Stream data from Spark Structured Streaming to Ray consumers:
from drls.streaming import from_spark_streaming
# Create a Spark streaming source
rate_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load()
)
# Bridge to Ray
iterator, query = from_spark_streaming(
rate_stream,
stream_id="my-stream",
max_buffered_batches=64,
max_buffered_bytes=2 * 1024**3,
)
# Consume in Ray
for batch in iterator:
print(f"Got {batch.num_rows} rows, schema: {batch.schema}")
# Process Arrow table...
# Stop when done
query.stop()
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
streaming_df |
Spark DataFrame | — | Spark Structured Streaming DataFrame |
stream_id |
str \| None |
auto | Name for the Ray coordinator actor |
max_buffered_batches |
int |
64 |
Maximum buffered Arrow batches |
max_buffered_bytes |
int |
2 GB | Maximum buffer size |
trigger |
— | None |
Spark trigger configuration |
checkpoint_location |
str \| None |
None |
Streaming checkpoint path |
use_jvm_sink |
bool |
False |
Use distributed JVM sink (bypasses driver bottleneck) |
Returns: (StreamingIterator, StreamingQueryWrapper)
Reverse Bridge: Ray to Spark¶
Stream data from Ray into a Spark Structured Streaming DataFrame:
from drls.streaming import to_spark_streaming
import pyarrow as pa
# Define a source that yields Arrow tables
def my_source():
while True:
yield pa.table({"value": [1, 2, 3]})
# Bridge to Spark
df, handle = to_spark_streaming(
source=my_source(),
spark=spark,
stream_id="reverse-stream",
)
# Use df as a Spark streaming DataFrame
query = (
df.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "drls.db.output")
.start()
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
source |
Iterator[pa.Table] or callable |
— | Arrow table source |
spark |
SparkSession |
— | Active Spark session |
stream_id |
str |
auto | Name for the coordinator |
max_buffered_batches |
int |
64 |
Buffer capacity |
max_buffered_bytes |
int |
2 GB | Buffer size limit |
Returns: (DataFrame, _ReverseStreamHandle)
Partitioned Consumers¶
Create multiple consumers with partition-based routing for parallel processing:
from drls.streaming import from_spark_streaming, create_partitioned_iterators
iterator, query = from_spark_streaming(streaming_df)
# Create 4 consumers, each getting a subset of partitions
consumers = create_partitioned_iterators(
coordinator=iterator._coordinator,
num_consumers=4,
num_partitions=16,
)
# Process in parallel with Ray tasks
import ray
@ray.remote
def process_partition(consumer):
for batch in consumer:
# process batch...
pass
futures = [process_partition.remote(c) for c in consumers]
ray.get(futures)
Backpressure¶
The StreamCoordinator enforces backpressure via two limits:
max_buffered_batches— Maximum number of Arrow batches held in the buffermax_buffered_bytes— Maximum total size of buffered data
When either limit is reached, the producer (Spark side) blocks until consumers drain the buffer.
Monitoring¶
Monitor streaming statistics from the terminal:
from drls.streaming.monitor import print_stats
# Print stats every 2 seconds
print_stats(coordinator, interval=2.0)
For production monitoring, the StreamCoordinator exposes Prometheus-compatible metrics via get_stats().
Example¶
"""Streaming bridge demo — Spark Structured Streaming to Ray via Iceberg.
Demonstrates the bidirectional streaming bridge:
1. Write to an Iceberg table via Spark
2. Stream from Iceberg CDC to Ray using from_spark_streaming()
3. Process micro-batches in Ray
4. Write results back to Iceberg via to_spark_streaming()
Run:
pip install drls
python examples/streaming_iceberg.py
"""
import ray
import drls
from drls.streaming import from_spark_streaming, to_spark_streaming
ray.init()
spark = drls.init_spark(
app_name="streaming-iceberg",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# --- Setup: create source data as a rate stream ---
rate_stream = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
)
# --- Forward bridge: Spark → Ray ---
print("Starting Spark → Ray bridge...")
coordinator, query = from_spark_streaming(
rate_stream,
coordinator_name="demo-stream",
max_batch_size=50,
buffer_size=5,
)
# Consume batches in Ray
import pyarrow as pa
batch_count = 0
total_rows = 0
for batch in coordinator:
if isinstance(batch, pa.RecordBatch):
total_rows += batch.num_rows
batch_count += 1
print(f" Batch {batch_count}: {batch.num_rows} rows, schema={batch.schema}")
if batch_count >= 5:
break
print(f"Consumed {total_rows} rows across {batch_count} batches")
# Stop the forward bridge
query.stop()
# --- Reverse bridge: Ray → Spark ---
print("\nStarting Ray → Spark bridge...")
ray_ds = ray.data.range(100)
reverse_query = to_spark_streaming(
spark,
ray_ds,
source_name="demo-reverse",
format="console", # just print to console for demo
options={"truncate": "false", "numRows": "5"},
trigger="once",
)
# Wait for the single trigger to complete
reverse_query.awaitTermination(timeout=30)
print("Reverse bridge complete!")
# Cleanup
drls.stop_spark()
ray.shutdown()
print("Streaming demo complete!")