Schema Evolution¶
Iceberg supports safe, backward-compatible schema evolution. DRLS exposes these operations via Python API and CLI.
Get Current Schema¶
Returns:
{
"success": True,
"table": "drls.db.events",
"columns": [
{"name": "id", "type": "bigint", "nullable": False, "comment": None},
{"name": "ts", "type": "timestamp", "nullable": True, "comment": None},
{"name": "payload", "type": "string", "nullable": True, "comment": None},
]
}
Add Column¶
Add a new column to the table:
from drls.iceberg import add_column
result = add_column(
spark,
"drls.db.events",
name="source",
type="STRING",
after="ts", # Optional: position after existing column
comment="Event source", # Optional: column comment
)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
spark |
SparkSession |
Yes | Active Spark session |
table |
str |
Yes | Fully qualified table name |
name |
str |
Yes | New column name |
type |
str |
Yes | Spark SQL type (e.g., STRING, BIGINT, DOUBLE) |
after |
str \| None |
No | Insert after this column |
comment |
str \| None |
No | Column comment |
Rename Column¶
from drls.iceberg import rename_column
result = rename_column(spark, "drls.db.events", old_name="payload", new_name="data")
Drop Column¶
Warning
Dropping a column is irreversible. Existing data for the column will no longer be readable.
Alter Column Type¶
Iceberg only allows safe type promotions:
from drls.iceberg import alter_column_type
result = alter_column_type(spark, "drls.db.events", name="id", new_type="DOUBLE")
Allowed type promotions:
| From | To |
|---|---|
int |
long |
float |
double |
decimal(P, S) |
decimal(P', S) where P' > P |
Note
Arbitrary type changes are not supported by Iceberg. Only widening promotions are allowed.
Example: Full Schema Evolution¶
import ray
import drls
from drls.iceberg import add_column, rename_column, drop_column, get_schema
ray.init()
spark = drls.init_spark("schema-demo", 1, 1, "512M",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/warehouse")
# Create table
spark.sql("""
CREATE TABLE drls.db.users (
id BIGINT, name STRING, email STRING
) USING iceberg
""")
# Add a column
add_column(spark, "drls.db.users", "created_at", "TIMESTAMP")
# Rename a column
rename_column(spark, "drls.db.users", "email", "email_address")
# Check schema
schema = get_schema(spark, "drls.db.users")
for col in schema["columns"]:
print(f" {col['name']}: {col['type']}")
drls.stop_spark()
ray.shutdown()