CDC Walkthrough¶
File: examples/cdc_pipeline.py
This example demonstrates Change Data Capture (CDC) with Iceberg: capturing inserts, updates, and deletes, then streaming them to Ray.
Full Source¶
"""CDC (Change Data Capture) pipeline with Iceberg.
Demonstrates:
1. Creating a table and inserting data
2. Making changes (inserts, updates, deletes)
3. Reading the CDC feed
4. Streaming CDC to Ray for processing
Run:
pip install drls
python examples/cdc_pipeline.py
"""
import ray
import drls
from drls.iceberg import read_cdc_stream, cdc_to_ray
ray.init()
spark = drls.init_spark(
app_name="cdc-pipeline",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Create a table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.cdc_demo")
spark.sql("DROP TABLE IF EXISTS drls.cdc_demo.users")
spark.sql("""
CREATE TABLE drls.cdc_demo.users (
id BIGINT,
name STRING,
email STRING
) USING iceberg
TBLPROPERTIES (
'write.wap.enabled' = 'true'
)
""")
# Insert initial data
spark.sql("""
INSERT INTO drls.cdc_demo.users VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', 'charlie@example.com')
""")
print("Inserted 3 users")
# Get the first snapshot id for CDC baseline
from drls.iceberg import history
snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]
print(f"First snapshot: {first_snapshot}")
# Make changes
spark.sql("""
INSERT INTO drls.cdc_demo.users VALUES
(4, 'Diana', 'diana@example.com')
""")
spark.sql("""
DELETE FROM drls.cdc_demo.users WHERE id = 2
""")
print("Applied changes: +1 insert, -1 delete")
# Read CDC changes since the first snapshot
print("\nCDC changes:")
cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)
# Stream CDC to Ray for processing
print("Streaming CDC to Ray...")
ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
print(f" {row}")
# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nCDC pipeline demo complete!")
Step-by-Step¶
Create a CDC-Enabled Table¶
spark.sql("""
CREATE TABLE drls.cdc_demo.users (
id BIGINT, name STRING, email STRING
) USING iceberg
TBLPROPERTIES ('write.wap.enabled' = 'true')
""")
The write.wap.enabled property enables Write-Audit-Publish, which is required for CDC tracking.
Insert Initial Data¶
spark.sql("""
INSERT INTO drls.cdc_demo.users VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', 'charlie@example.com')
""")
Capture the Baseline Snapshot¶
from drls.iceberg import history
snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]
This snapshot ID marks the baseline for CDC — changes after this point will be captured.
Make Changes¶
# Insert a new row
spark.sql("INSERT INTO drls.cdc_demo.users VALUES (4, 'Diana', 'diana@example.com')")
# Delete a row
spark.sql("DELETE FROM drls.cdc_demo.users WHERE id = 2")
Read CDC Changes¶
from drls.iceberg import read_cdc_stream
cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)
The CDC DataFrame includes metadata columns:
_change_type—insert,delete,update_before,update_after_change_ordinal— Order within the commit_commit_snapshot_id— Which snapshot made the change
Stream CDC to Ray¶
from drls.iceberg import cdc_to_ray
ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
print(row)
cdc_to_ray() converts the CDC feed into a Ray Dataset for downstream processing — useful for building real-time pipelines that react to changes.