Time Travel Walkthrough¶
File: examples/time_travel.py
This example demonstrates Iceberg's time travel capabilities: snapshot history, point-in-time queries, and rollback.
Full Source¶
"""Iceberg time travel demo.
Demonstrates:
1. Creating a table with multiple snapshots
2. Viewing snapshot history
3. Querying historical data via time travel
4. Rolling back to a previous snapshot
Run:
pip install drls
python examples/time_travel.py
"""
import ray
import drls
from drls.iceberg import history, snapshot_at, rollback_to
ray.init()
spark = drls.init_spark(
app_name="time-travel",
num_executors=1,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Create table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.travel_demo")
spark.sql("DROP TABLE IF EXISTS drls.travel_demo.metrics")
spark.sql("""
CREATE TABLE drls.travel_demo.metrics (
ts TIMESTAMP,
metric STRING,
value DOUBLE
) USING iceberg
""")
# Insert data in waves to create multiple snapshots
import time
for i in range(3):
spark.sql(f"""
INSERT INTO drls.travel_demo.metrics VALUES
(current_timestamp(), 'cpu', {50 + i * 10}.0),
(current_timestamp(), 'memory', {60 + i * 5}.0)
""")
print(f"Snapshot {i+1}: inserted 2 rows")
time.sleep(1) # small delay so timestamps differ
# View history
print("\n--- Snapshot History ---")
h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
print(f" ID: {snap['snapshot_id']}, Time: {snap['committed_at']}, "
f"Operation: {snap['operation']}")
# Query current state
current_count = spark.table("drls.travel_demo.metrics").count()
print(f"\nCurrent row count: {current_count}") # 6
# Time-travel to the first snapshot
first_snap_id = h["snapshots"][0]["snapshot_id"]
print(f"\n--- Time travel to snapshot {first_snap_id} ---")
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}") # 2
old_df.show()
# Rollback to first snapshot
print(f"\n--- Rolling back to snapshot {first_snap_id} ---")
result = rollback_to(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
print(f"Rollback result: {result}")
rolled_back_count = spark.table("drls.travel_demo.metrics").count()
print(f"Row count after rollback: {rolled_back_count}") # 2
# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nTime travel demo complete!")
Step-by-Step¶
Create Multiple Snapshots¶
for i in range(3):
spark.sql(f"""
INSERT INTO drls.travel_demo.metrics VALUES
(current_timestamp(), 'cpu', {50 + i * 10}.0),
(current_timestamp(), 'memory', {60 + i * 5}.0)
""")
time.sleep(1)
Each INSERT creates a new Iceberg snapshot. After three inserts, the table has 3 snapshots with 2 rows each (6 total).
View Snapshot History¶
from drls.iceberg import history
h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
print(f" ID: {snap['snapshot_id']}, Time: {snap['committed_at']}")
The history shows all snapshots with their IDs, timestamps, and operations.
Time-Travel Query¶
from drls.iceberg import snapshot_at
first_snap_id = h["snapshots"][0]["snapshot_id"]
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}") # 2
snapshot_at() returns a DataFrame showing the table as it was at that snapshot — only 2 rows from the first insert.
Rollback¶
from drls.iceberg import rollback_to
result = rollback_to(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
rolled_back_count = spark.table("drls.travel_demo.metrics").count() # 2
After rollback, the current table state matches the first snapshot. This creates a new snapshot — the history is preserved.