Change Data Capture (CDC)¶
DRLS supports reading Iceberg CDC (Change Data Capture) feeds and streaming changes to Ray for processing.
Read CDC Stream¶
Create a streaming DataFrame from Iceberg CDC changes:
from drls.iceberg import read_cdc_stream
cdc_df = read_cdc_stream(
spark,
"drls.db.users",
starting_snapshot_id=1234567890, # Optional: start from specific snapshot
)
cdc_df.show(truncate=False)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
table |
str |
— | Fully qualified table name |
starting_snapshot_id |
int \| None |
None |
Start reading CDC from this snapshot |
The returned DataFrame includes Iceberg CDC metadata columns:
| Column | Description |
|---|---|
_change_type |
insert, delete, update_before, update_after |
_change_ordinal |
Order of the change within the commit |
_commit_snapshot_id |
Snapshot ID where the change occurred |
Stream CDC to Ray¶
Stream CDC changes directly to a Ray Dataset for processing:
from drls.iceberg import cdc_to_ray
ray_ds = cdc_to_ray(
spark,
"drls.db.users",
starting_snapshot_id=1234567890,
)
for row in ray_ds.take(10):
print(row)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
table |
str |
— | Fully qualified table name |
starting_snapshot_id |
int \| None |
None |
Start from this snapshot |
Returns: ray.data.Dataset with CDC rows.
Example¶
"""CDC (Change Data Capture) pipeline with Iceberg.
Demonstrates:
1. Creating a table and inserting data
2. Making changes (inserts, updates, deletes)
3. Reading the CDC feed
4. Streaming CDC to Ray for processing
Run:
pip install drls
python examples/cdc_pipeline.py
"""
import ray
import drls
from drls.iceberg import read_cdc_stream, cdc_to_ray
ray.init()
spark = drls.init_spark(
app_name="cdc-pipeline",
num_executors=2,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Create a table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.cdc_demo")
spark.sql("DROP TABLE IF EXISTS drls.cdc_demo.users")
spark.sql("""
CREATE TABLE drls.cdc_demo.users (
id BIGINT,
name STRING,
email STRING
) USING iceberg
TBLPROPERTIES (
'write.wap.enabled' = 'true'
)
""")
# Insert initial data
spark.sql("""
INSERT INTO drls.cdc_demo.users VALUES
(1, 'Alice', 'alice@example.com'),
(2, 'Bob', 'bob@example.com'),
(3, 'Charlie', 'charlie@example.com')
""")
print("Inserted 3 users")
# Get the first snapshot id for CDC baseline
from drls.iceberg import history
snapshots = history(spark, "drls.cdc_demo.users")
first_snapshot = snapshots["snapshots"][0]["snapshot_id"]
print(f"First snapshot: {first_snapshot}")
# Make changes
spark.sql("""
INSERT INTO drls.cdc_demo.users VALUES
(4, 'Diana', 'diana@example.com')
""")
spark.sql("""
DELETE FROM drls.cdc_demo.users WHERE id = 2
""")
print("Applied changes: +1 insert, -1 delete")
# Read CDC changes since the first snapshot
print("\nCDC changes:")
cdc_df = read_cdc_stream(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
cdc_df.show(truncate=False)
# Stream CDC to Ray for processing
print("Streaming CDC to Ray...")
ray_ds = cdc_to_ray(spark, "drls.cdc_demo.users",
starting_snapshot_id=first_snapshot)
for row in ray_ds.take(10):
print(f" {row}")
# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nCDC pipeline demo complete!")
Table Properties¶
To enable CDC on an Iceberg table, set the WAP (Write-Audit-Publish) property: