Skip to content

CDC Walkthrough

File: examples/cdc_pipeline.py

This example demonstrates Change Data Capture (CDC) with Iceberg: capturing inserts, updates, and deletes, then streaming them to Ray.

Full Source

"""CDC (Change Data Capture) pipeline with Iceberg.

Demonstrates:
1. Creating a table and inserting data
2. Making changes (inserts, updates, deletes)
3. Reading the CDC feed
4. Streaming CDC to Ray for processing

Run:
    pip install drls
    python examples/cdc_pipeline.py
"""
import ray
import drls
from drls.iceberg import read_cdc_stream, cdc_to_ray

ray.init()

spark = drls.init_spark(
    app_name="cdc-pipeline",
    num_executors=2,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Create a table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.cdc_demo")
spark.sql("DROP TABLE IF EXISTS drls.cdc_demo.users")
spark.sql("""
    CREATE TABLE drls.cdc_demo.users (
        id BIGINT,
        name STRING,
        email STRING
    ) USING iceberg
    TBLPROPERTIES (
        'write.wap.enabled' = 'true'
    )
""")

# Insert initial data
spark.sql("""
    INSERT INTO drls.cdc_demo.users VALUES
    (1, 'Alice', 'alice@example.com'),
    (2, 'Bob', 'bob@example.com'),
    (3, 'Charlie', 'charlie@example.com')
""")
print("Inserted 3 users")

# Get the first snapshot id for CDC baseline
from drls.iceberg import history
snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]
print(f"First snapshot: {first_snapshot}")

# Make changes
spark.sql("""
    INSERT INTO drls.cdc_demo.users VALUES
    (4, 'Diana', 'diana@example.com')
""")
spark.sql("""
    DELETE FROM drls.cdc_demo.users WHERE id = 2
""")
print("Applied changes: +1 insert, -1 delete")

# Read CDC changes since the first snapshot
print("\nCDC changes:")
cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
                         starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)

# Stream CDC to Ray for processing
print("Streaming CDC to Ray...")
ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
                    starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
    print(f"  {row}")

# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nCDC pipeline demo complete!")

Step-by-Step

Create a CDC-Enabled Table

spark.sql("""
    CREATE TABLE drls.cdc_demo.users (
        id BIGINT, name STRING, email STRING
    ) USING iceberg
    TBLPROPERTIES ('write.wap.enabled' = 'true')
""")

The write.wap.enabled property enables Write-Audit-Publish, which is required for CDC tracking.

Insert Initial Data

spark.sql("""
    INSERT INTO drls.cdc_demo.users VALUES
    (1, 'Alice', 'alice@example.com'),
    (2, 'Bob', 'bob@example.com'),
    (3, 'Charlie', 'charlie@example.com')
""")

Capture the Baseline Snapshot

from drls.iceberg import history

snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]

This snapshot ID marks the baseline for CDC — changes after this point will be captured.

Make Changes

# Insert a new row
spark.sql("INSERT INTO drls.cdc_demo.users VALUES (4, 'Diana', 'diana@example.com')")

# Delete a row
spark.sql("DELETE FROM drls.cdc_demo.users WHERE id = 2")

Read CDC Changes

from drls.iceberg import read_cdc_stream

cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
                         starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)

The CDC DataFrame includes metadata columns:

  • _change_typeinsert, delete, update_before, update_after
  • _change_ordinal — Order within the commit
  • _commit_snapshot_id — Which snapshot made the change

Stream CDC to Ray

from drls.iceberg import cdc_to_ray

ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
                    starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
    print(row)

cdc_to_ray() converts the CDC feed into a Ray Dataset for downstream processing — useful for building real-time pipelines that react to changes.