Skip to content

Time Travel

Iceberg maintains a snapshot history, allowing you to query data at any point in time or roll back to a previous state.

View Snapshot History

from drls.iceberg import history

h = history(spark, "drls.db.events")

Returns:

{
    "success": True,
    "table": "drls.db.events",
    "snapshots": [
        {
            "snapshot_id": 1234567890,
            "committed_at": "2024-01-15T10:30:00Z",
            "operation": "append",
            "parent_id": None,
            "is_current_ancestor": True,
        },
        {
            "snapshot_id": 2345678901,
            "committed_at": "2024-01-15T11:00:00Z",
            "operation": "overwrite",
            "parent_id": 1234567890,
            "is_current_ancestor": True,
        },
    ]
}

CLI equivalent:

drls history drls.db.events

Query at a Snapshot

Read the table as it was at a specific snapshot:

from drls.iceberg import snapshot_at

# By snapshot ID
df = snapshot_at(spark, "drls.db.events", snapshot_id=1234567890)
df.show()

# By timestamp
df = snapshot_at(spark, "drls.db.events", timestamp="2024-01-15T10:30:00")
df.show()

Parameters:

Parameter Type Description
spark SparkSession Active Spark session
table str Fully qualified table name
snapshot_id int \| None Target snapshot ID
timestamp str \| None Target point-in-time (ISO8601)

Note

Provide exactly one of snapshot_id or timestamp.

Returns: pyspark.sql.DataFrame — the table at the specified point in time.

Rollback

Roll back the table to a previous snapshot. This creates a new snapshot identical to the target:

from drls.iceberg import rollback_to

result = rollback_to(spark, "drls.db.events", snapshot_id=1234567890)

Returns:

{
    "success": True,
    "table": "drls.db.events",
    "rolled_back_to": 1234567890,
}

Warning

Rollback creates a new snapshot — it does not delete history. The table will have a new current snapshot that points to the same data as the target snapshot.

Example

"""Iceberg time travel demo.

Demonstrates:
1. Creating a table with multiple snapshots
2. Viewing snapshot history
3. Querying historical data via time travel
4. Rolling back to a previous snapshot

Run:
    pip install drls
    python examples/time_travel.py
"""
import ray
import drls
from drls.iceberg import history, snapshot_at, rollback_to

ray.init()

spark = drls.init_spark(
    app_name="time-travel",
    num_executors=1,
    executor_cores=1,
    executor_memory="512M",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/drls-warehouse",
)

# Create table
spark.sql("CREATE DATABASE IF NOT EXISTS drls.travel_demo")
spark.sql("DROP TABLE IF EXISTS drls.travel_demo.metrics")
spark.sql("""
    CREATE TABLE drls.travel_demo.metrics (
        ts TIMESTAMP,
        metric STRING,
        value DOUBLE
    ) USING iceberg
""")

# Insert data in waves to create multiple snapshots
import time

for i in range(3):
    spark.sql(f"""
        INSERT INTO drls.travel_demo.metrics VALUES
        (current_timestamp(), 'cpu', {50 + i * 10}.0),
        (current_timestamp(), 'memory', {60 + i * 5}.0)
    """)
    print(f"Snapshot {i+1}: inserted 2 rows")
    time.sleep(1)  # small delay so timestamps differ

# View history
print("\n--- Snapshot History ---")
h = history(spark, "drls.travel_demo.metrics")
for snap in h["snapshots"]:
    print(f"  ID: {snap['snapshot_id']}, Time: {snap['committed_at']}, "
          f"Operation: {snap['operation']}")

# Query current state
current_count = spark.table("drls.travel_demo.metrics").count()
print(f"\nCurrent row count: {current_count}")  # 6

# Time-travel to the first snapshot
first_snap_id = h["snapshots"][0]["snapshot_id"]
print(f"\n--- Time travel to snapshot {first_snap_id} ---")
old_df = snapshot_at(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
print(f"Row count at first snapshot: {old_df.count()}")  # 2
old_df.show()

# Rollback to first snapshot
print(f"\n--- Rolling back to snapshot {first_snap_id} ---")
result = rollback_to(spark, "drls.travel_demo.metrics",
                     snapshot_id=first_snap_id)
print(f"Rollback result: {result}")

rolled_back_count = spark.table("drls.travel_demo.metrics").count()
print(f"Row count after rollback: {rolled_back_count}")  # 2

# Cleanup
drls.stop_spark()
ray.shutdown()
print("\nTime travel demo complete!")