Skip to content

Change Data Capture (CDC)

DRLS supports reading Iceberg CDC (Change Data Capture) feeds and streaming changes to Ray for processing.

Read CDC Stream

Create a streaming DataFrame from Iceberg CDC changes:

from drls.iceberg import read_cdc_stream

cdc_df = read_cdc_stream(
    spark,
    "drls.db.users",
    starting_snapshot_id=1234567890,  # Optional: start from specific snapshot
)
cdc_df.show(truncate=False)

Parameters:

Parameter Type Default Description
spark SparkSession Active Spark session
table str Fully qualified table name
starting_snapshot_id int \| None None Start reading CDC from this snapshot

The returned DataFrame includes Iceberg CDC metadata columns:

Column Description
_change_type insert, delete, update_before, update_after
_change_ordinal Order of the change within the commit
_commit_snapshot_id Snapshot ID where the change occurred

Stream CDC to Ray

Stream CDC changes directly to a Ray Dataset for processing:

from drls.iceberg import cdc_to_ray

ray_ds = cdc_to_ray(
    spark,
    "drls.db.users",
    starting_snapshot_id=1234567890,
)

for row in ray_ds.take(10):
    print(row)

Parameters:

Parameter Type Default Description
spark SparkSession Active Spark session
table str Fully qualified table name
starting_snapshot_id int \| None None Start from this snapshot

Returns: ray.data.Dataset with CDC rows.

Example

"""CDC (Change Data Capture) pipeline with Iceberg.

Demonstrates:
1. Creating a table and inserting data
2. Making changes (inserts, updates, deletes)
3. Reading the CDC feed
4. Streaming CDC to Ray for processing

Run:
    pip install drls
    python examples/cdc_pipeline.py
"""
import ray
import drls
from drls.iceberg import read_cdc_stream, cdc_to_ray

ray.init()

spark = drls.init_spark(
    app_name="cdc-pipeline",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Create a table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.cdc_demo")
spark.sql("DROP TABLE IF EXISTS drls.cdc_demo.users")
spark.sql("""
    CREATE TABLE drls.cdc_demo.users (
        id BIGINT,
        name STRING,
        email STRING
    ) USING iceberg
    TBLPROPERTIES (
        'write.wap.enabled' = 'true'
    )
""")

# Insert initial data
spark.sql("""
    INSERT INTO drls.cdc_demo.users VALUES
    (1, 'Alice', 'alice@example.com'),
    (2, 'Bob', 'bob@example.com'),
    (3, 'Charlie', 'charlie@example.com')
""")
print("Inserted 3 users")

# Get the first snapshot id for CDC baseline
from drls.iceberg import history
snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]
print(f"First snapshot: {first_snapshot}")

# Make changes
spark.sql("""
    INSERT INTO drls.cdc_demo.users VALUES
    (4, 'Diana', 'diana@example.com')
""")
spark.sql("""
    DELETE FROM drls.cdc_demo.users WHERE id = 2
""")
print("Applied changes: +1 insert, -1 delete")

# Read CDC changes since the first snapshot
print("\nCDC changes:")
cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
                         starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)

# Stream CDC to Ray for processing
print("Streaming CDC to Ray...")
ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
                    starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
    print(f"  {row}")

# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nCDC pipeline demo complete!")

Table Properties

To enable CDC on an Iceberg table, set the WAP (Write-Audit-Publish) property:

CREATE TABLE drls.db.users (
    id BIGINT,
    name STRING,
    email STRING
) USING iceberg
TBLPROPERTIES (
    'write.wap.enabled' = 'true'
)