Skip to content

Time Travel Walkthrough

File: examples/time_travel.py

This example demonstrates Iceberg's time travel capabilities: snapshot history, point-in-time queries, and rollback.

Full Source

"""Iceberg time travel demo.

Demonstrates:
1. Creating a table with multiple snapshots
2. Viewing snapshot history
3. Querying historical data via time travel
4. Rolling back to a previous snapshot

Run:
    pip install drls
    python examples/time_travel.py
"""
import ray
import drls
from drls.iceberg import history, snapshot_at, rollback_to

ray.init()

spark = drls.init_spark(
    app_name="time-travel",
    num_executors=1,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Create table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.travel_demo")
spark.sql("DROP TABLE IF EXISTS drls.travel_demo.metrics")
spark.sql("""
    CREATE TABLE drls.travel_demo.metrics (
        ts TIMESTAMP,
        metric STRING,
        value DOUBLE
    ) USING iceberg
""")

# Insert data in waves to create multiple snapshots
import time

for i in range(3):
    spark.sql(f"""
        INSERT INTO drls.travel_demo.metrics VALUES
        (current_timestamp(), 'cpu', {50 + i * 10}.0),
        (current_timestamp(), 'memory', {60 + i * 5}.0)
    """)
    print(f"Snapshot {i+1}: inserted 2 rows")
    time.sleep(1)  # small delay so timestamps differ

# View history
print("\n--- Snapshot History ---")
h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
    print(f"  ID: {snap['snapshot_id']}, Time: {snap['committed_at']}, "
          f"Operation: {snap['operation']}")

# Query current state
current_count = spark.table("drls.travel_demo.metrics").count()
print(f"\nCurrent row count: {current_count}")  # 6

# Time-travel to the first snapshot
first_snap_id = h["snapshots"][0]["snapshot_id"]
print(f"\n--- Time travel to snapshot {first_snap_id} ---")
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}")  # 2
old_df.show()

# Rollback to first snapshot
print(f"\n--- Rolling back to snapshot {first_snap_id} ---")
result = rollback_to(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
print(f"Rollback result: {result}")

rolled_back_count = spark.table("drls.travel_demo.metrics").count()
print(f"Row count after rollback: {rolled_back_count}")  # 2

# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nTime travel demo complete!")

Step-by-Step

Create Multiple Snapshots

for i in range(3):
    spark.sql(f"""
        INSERT INTO drls.travel_demo.metrics VALUES
        (current_timestamp(), 'cpu', {50 + i * 10}.0),
        (current_timestamp(), 'memory', {60 + i * 5}.0)
    """)
    time.sleep(1)

Each INSERT creates a new Iceberg snapshot. After three inserts, the table has 3 snapshots with 2 rows each (6 total).

View Snapshot History

from drls.iceberg import history

h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
    print(f"  ID: {snap['snapshot_id']}, Time: {snap['committed_at']}")

The history shows all snapshots with their IDs, timestamps, and operations.

Time-Travel Query

from drls.iceberg import snapshot_at

first_snap_id = h["snapshots"][0]["snapshot_id"]
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}")  # 2

snapshot_at() returns a DataFrame showing the table as it was at that snapshot — only 2 rows from the first insert.

Rollback

from drls.iceberg import rollback_to

result = rollback_to(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
rolled_back_count = spark.table("drls.travel_demo.metrics").count()  # 2

After rollback, the current table state matches the first snapshot. This creates a new snapshot — the history is preserved.