Skip to content

Streaming Bridge

DRLS provides a bidirectional streaming bridge between Spark Structured Streaming and Ray, using Arrow tables for zero-copy data transfer.

Architecture

graph LR
    subgraph "Forward: Spark → Ray"
        SS["Spark Streaming"] -->|foreachBatch| Sink["SparkStreamingSink"]
        Sink -->|Arrow tables| SC["StreamCoordinator<br/>(Ray Actor)"]
        SC -->|pull| SI["StreamingIterator"]
    end
graph LR
    subgraph "Reverse: Ray → Spark"
        RS["Ray Source"] --> PT["ProducerThread"]
        PT -->|Arrow tables| SC2["StreamCoordinator<br/>(Ray Actor)"]
        SC2 --> JB["JvmBridgeThread"]
        JB -->|IPC| SS2["Spark Streaming DF"]
    end

The StreamCoordinator is a named Ray actor that handles buffering, backpressure, and partition routing.

Forward Bridge: Spark to Ray

Stream data from Spark Structured Streaming to Ray consumers:

from drls.streaming import from_spark_streaming

# Create a Spark streaming source
rate_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 100)
    .load()
)

# Bridge to Ray
iterator, query = from_spark_streaming(
    rate_stream,
    stream_id="my-stream",
    max_buffered_batches=64,
    max_buffered_bytes=2 * 1024**3,
)

# Consume in Ray
for batch in iterator:
    print(f"Got {batch.num_rows} rows, schema: {batch.schema}")
    # Process Arrow table...

# Stop when done
query.stop()

Parameters:

Parameter Type Default Description
streaming_df Spark DataFrame Spark Structured Streaming DataFrame
stream_id str \| None auto Name for the Ray coordinator actor
max_buffered_batches int 64 Maximum buffered Arrow batches
max_buffered_bytes int 2 GB Maximum buffer size
trigger None Spark trigger configuration
checkpoint_location str \| None None Streaming checkpoint path
use_jvm_sink bool False Use distributed JVM sink (bypasses driver bottleneck)

Returns: (StreamingIterator, StreamingQueryWrapper)

Reverse Bridge: Ray to Spark

Stream data from Ray into a Spark Structured Streaming DataFrame:

from drls.streaming import to_spark_streaming
import pyarrow as pa

# Define a source that yields Arrow tables
def my_source():
    while True:
        yield pa.table({"value": [1, 2, 3]})

# Bridge to Spark
df, handle = to_spark_streaming(
    source=my_source(),
    spark=spark,
    stream_id="reverse-stream",
)

# Use df as a Spark streaming DataFrame
query = (
    df.writeStream
    .format("iceberg")
    .outputMode("append")
    .option("path", "drls.db.output")
    .start()
)

Parameters:

Parameter Type Default Description
source Iterator[pa.Table] or callable Arrow table source
spark SparkSession Active Spark session
stream_id str auto Name for the coordinator
max_buffered_batches int 64 Buffer capacity
max_buffered_bytes int 2 GB Buffer size limit

Returns: (DataFrame, _ReverseStreamHandle)

Partitioned Consumers

Create multiple consumers with partition-based routing for parallel processing:

from drls.streaming import from_spark_streaming, create_partitioned_iterators

iterator, query = from_spark_streaming(streaming_df)

# Create 4 consumers, each getting a subset of partitions
consumers = create_partitioned_iterators(
    coordinator=iterator._coordinator,
    num_consumers=4,
    num_partitions=16,
)

# Process in parallel with Ray tasks
import ray

@ray.remote
def process_partition(consumer):
    for batch in consumer:
        # process batch...
        pass

futures = [process_partition.remote(c) for c in consumers]
ray.get(futures)

Backpressure

The StreamCoordinator enforces backpressure via two limits:

  • max_buffered_batches — Maximum number of Arrow batches held in the buffer
  • max_buffered_bytes — Maximum total size of buffered data

When either limit is reached, the producer (Spark side) blocks until consumers drain the buffer.

Monitoring

Monitor streaming statistics from the terminal:

from drls.streaming.monitor import print_stats

# Print stats every 2 seconds
print_stats(coordinator, interval=2.0)

For production monitoring, the StreamCoordinator exposes Prometheus-compatible metrics via get_stats().

Example

"""Streaming bridge demo — Spark Structured Streaming to Ray via Iceberg.

Demonstrates the bidirectional streaming bridge:
1. Write to an Iceberg table via Spark
2. Stream from Iceberg CDC to Ray using from_spark_streaming()
3. Process micro-batches in Ray
4. Write results back to Iceberg via to_spark_streaming()

Run:
    pip install drls
    python examples/streaming_iceberg.py
"""
import ray
import drls
from drls.streaming import from_spark_streaming, to_spark_streaming

ray.init()

spark = drls.init_spark(
    app_name="streaming-iceberg",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# --- Setup: create source data as a rate stream ---
rate_stream = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 10)
    .load()
)

# --- Forward bridge: Spark → Ray ---
print("Starting Spark → Ray bridge...")
coordinator, query = from_spark_streaming(
    rate_stream,
    coordinator_name="demo-stream",
    max_batch_size=50,
    buffer_size=5,
)

# Consume batches in Ray
import pyarrow as pa

batch_count = 0
total_rows = 0
for batch in coordinator:
    if isinstance(batch, pa.RecordBatch):
        total_rows += batch.num_rows
        batch_count += 1
        print(f"  Batch {batch_count}: {batch.num_rows} rows, schema={batch.schema}")
    if batch_count >= 5:
        break

print(f"Consumed {total_rows} rows across {batch_count} batches")

# Stop the forward bridge
query.stop()

# --- Reverse bridge: Ray → Spark ---
print("\nStarting Ray → Spark bridge...")
ray_ds = ray.data.range(100)

reverse_query = to_spark_streaming(
    spark,
    ray_ds,
    source_name="demo-reverse",
    format="console",  # just print to console for demo
    options={"truncate": "false", "numRows": "5"},
    trigger="once",
)

# Wait for the single trigger to complete
reverse_query.awaitTermination(timeout=30)
print("Reverse bridge complete!")

# Cleanup
drls.stop_spark()
ray.shutdown()
print("Streaming demo complete!")