Time Travel¶
Iceberg maintains a snapshot history, allowing you to query data at any point in time or roll back to a previous state.
View Snapshot History¶
Returns:
{
"success": True,
"table": "drls.db.events",
"snapshots": [
{
"snapshot_id": 1234567890,
"committed_at": "2024-01-15T10:30:00Z",
"operation": "append",
"parent_id": None,
"is_current_ancestor": True,
},
{
"snapshot_id": 2345678901,
"committed_at": "2024-01-15T11:00:00Z",
"operation": "overwrite",
"parent_id": 1234567890,
"is_current_ancestor": True,
},
]
}
CLI equivalent:
Query at a Snapshot¶
Read the table as it was at a specific snapshot:
from drls.iceberg import snapshot_at
# By snapshot ID
df = snapshot_at(spark, "drls.db.events", snapshot_id=1234567890)
df.show()
# By timestamp
df = snapshot_at(spark, "drls.db.events", timestamp="2024-01-15T10:30:00")
df.show()
Parameters:
| Parameter | Type | Description |
|---|---|---|
spark |
SparkSession |
Active Spark session |
table |
str |
Fully qualified table name |
snapshot_id |
int \| None |
Target snapshot ID |
timestamp |
str \| None |
Target point-in-time (ISO8601) |
Note
Provide exactly one of snapshot_id or timestamp.
Returns: pyspark.sql.DataFrame — the table at the specified point in time.
Rollback¶
Roll back the table to a previous snapshot. This creates a new snapshot identical to the target:
from drls.iceberg import rollback_to
result = rollback_to(spark, "drls.db.events", snapshot_id=1234567890)
Returns:
Warning
Rollback creates a new snapshot — it does not delete history. The table will have a new current snapshot that points to the same data as the target snapshot.
Example¶
"""Iceberg time travel demo.
Demonstrates:
1. Creating a table with multiple snapshots
2. Viewing snapshot history
3. Querying historical data via time travel
4. Rolling back to a previous snapshot
Run:
pip install drls
python examples/time_travel.py
"""
import ray
import drls
from drls.iceberg import history, snapshot_at, rollback_to
ray.init()
spark = drls.init_spark(
app_name="time-travel",
num_executors=1,
executor_cores=1,
executor_memory="512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/drls-warehouse",
)
# Create table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.travel_demo")
spark.sql("DROP TABLE IF EXISTS drls.travel_demo.metrics")
spark.sql("""
CREATE TABLE drls.travel_demo.metrics (
ts TIMESTAMP,
metric STRING,
value DOUBLE
) USING iceberg
""")
# Insert data in waves to create multiple snapshots
import time
for i in range(3):
spark.sql(f"""
INSERT INTO drls.travel_demo.metrics VALUES
(current_timestamp(), 'cpu', {50 + i * 10}.0),
(current_timestamp(), 'memory', {60 + i * 5}.0)
""")
print(f"Snapshot {i+1}: inserted 2 rows")
time.sleep(1) # small delay so timestamps differ
# View history
print("\n--- Snapshot History ---")
h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
print(f" ID: {snap['snapshot_id']}, Time: {snap['committed_at']}, "
f"Operation: {snap['operation']}")
# Query current state
current_count = spark.table("drls.travel_demo.metrics").count()
print(f"\nCurrent row count: {current_count}") # 6
# Time-travel to the first snapshot
first_snap_id = h["snapshots"][0]["snapshot_id"]
print(f"\n--- Time travel to snapshot {first_snap_id} ---")
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}") # 2
old_df.show()
# Rollback to first snapshot
print(f"\n--- Rolling back to snapshot {first_snap_id} ---")
result = rollback_to(spark, "drls.travel_demo.metrics",
snapshot_id=first_snap_id)
print(f"Rollback result: {result}")
rolled_back_count = spark.table("drls.travel_demo.metrics").count()
print(f"Row count after rollback: {rolled_back_count}") # 2
# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nTime travel demo complete!")