Skip to content

Schema Evolution

Iceberg supports safe, backward-compatible schema evolution. DRLS exposes these operations via Python API and CLI.

Get Current Schema

from drls.iceberg import get_schema

schema = get_schema(spark, "drls.db.events")

Returns:

{
    "success": True,
    "table": "drls.db.events",
    "columns": [
        {"name": "id", "type": "bigint", "nullable": False, "comment": None},
        {"name": "ts", "type": "timestamp", "nullable": True, "comment": None},
        {"name": "payload", "type": "string", "nullable": True, "comment": None},
    ]
}

Add Column

Add a new column to the table:

from drls.iceberg import add_column

result = add_column(
    spark,
    "drls.db.events",
    name="source",
    type="STRING",
    after="ts",           # Optional: position after existing column
    comment="Event source",  # Optional: column comment
)

Parameters:

Parameter Type Required Description
spark SparkSession Yes Active Spark session
table str Yes Fully qualified table name
name str Yes New column name
type str Yes Spark SQL type (e.g., STRING, BIGINT, DOUBLE)
after str \| None No Insert after this column
comment str \| None No Column comment

Rename Column

from drls.iceberg import rename_column

result = rename_column(spark, "drls.db.events", old_name="payload", new_name="data")

Drop Column

from drls.iceberg import drop_column

result = drop_column(spark, "drls.db.events", name="source")

Warning

Dropping a column is irreversible. Existing data for the column will no longer be readable.

Alter Column Type

Iceberg only allows safe type promotions:

from drls.iceberg import alter_column_type

result = alter_column_type(spark, "drls.db.events", name="id", new_type="DOUBLE")

Allowed type promotions:

From To
int long
float double
decimal(P, S) decimal(P', S) where P' > P

Note

Arbitrary type changes are not supported by Iceberg. Only widening promotions are allowed.

Example: Full Schema Evolution

import ray
import drls
from drls.iceberg import add_column, rename_column, drop_column, get_schema

ray.init()
spark = drls.init_spark("schema-demo", 1, 1, "512M",
                                 iceberg_catalog="hadoop",
                                 iceberg_warehouse="/tmp/warehouse")

# Create table
spark.sql("""
    CREATE TABLE drls.db.users (
        id BIGINT, name STRING, email STRING
    ) USING iceberg
""")

# Add a column
add_column(spark, "drls.db.users", "created_at", "TIMESTAMP")

# Rename a column
rename_column(spark, "drls.db.users", "email", "email_address")

# Check schema
schema = get_schema(spark, "drls.db.users")
for col in schema["columns"]:
    print(f"  {col['name']}: {col['type']}")

drls.stop_spark()
ray.shutdown()