Iceberg Operations¶
DRLS provides a complete set of Iceberg table maintenance operations accessible via Python API and CLI.
Catalog Configuration¶
Configure the Iceberg catalog using IcebergCatalogConfig:
from drls.iceberg.catalog import IcebergCatalogConfig, configure_iceberg
config = IcebergCatalogConfig(
catalog_type="hadoop", # hadoop, hive, rest, polaris, glue, nessie
warehouse="/tmp/warehouse",
catalog_name="drls",
uri=None, # Required for hive, rest, polaris, nessie
extra_props={}, # Additional catalog properties
)
spark_configs = configure_iceberg(config)
Supported Catalog Types¶
| Type | URI Required | Description |
|---|---|---|
hadoop |
No | Local/HDFS filesystem catalog |
hive |
Yes | Hive Metastore catalog |
rest |
Yes | REST catalog (e.g., Tabular) |
polaris |
Yes | Snowflake Polaris catalog |
glue |
No | AWS Glue Data Catalog |
nessie |
Yes | Project Nessie catalog |
Table Health¶
Get a health report for an Iceberg table:
Returns:
{
"success": True,
"table": "drls.db.events",
"snapshot_count": 15,
"data_file_count": 42,
"total_size_bytes": 1048576,
"avg_file_size_bytes": 24966,
"small_file_count": 30,
"recommendation": "Consider compaction — 30 small files detected"
}
CLI equivalent:
Compaction¶
Rewrite data files to optimize read performance:
from drls.iceberg import compact_table
result = compact_table(spark, "drls.db.events", strategy="binpack")
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
table |
str |
— | Fully qualified table name |
strategy |
str |
"binpack" |
Compaction strategy: binpack, sort, zorder |
options |
dict[str, str] \| None |
None |
Additional compaction options |
Returns: Dict with rewritten_files, new_files, rewritten_bytes, success.
CLI equivalent:
Snapshot Expiration¶
Remove old snapshots to reclaim metadata storage:
from drls.iceberg import expire_snapshots
result = expire_snapshots(spark, "drls.db.events", retain_last=5)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
table |
str |
— | Fully qualified table name |
retain_last |
int |
5 |
Number of recent snapshots to keep |
older_than |
str \| None |
None |
Expire snapshots older than this (ISO8601) |
CLI equivalent:
Orphan File Removal¶
Remove data files not referenced by any snapshot:
from drls.iceberg import remove_orphan_files
# Dry run first
result = remove_orphan_files(spark, "drls.db.events", dry_run=True)
print(f"Would remove {result['orphan_count']} files")
# Actually remove
result = remove_orphan_files(spark, "drls.db.events")
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession |
— | Active Spark session |
table |
str |
— | Fully qualified table name |
older_than |
str \| None |
None |
Only remove files older than this |
dry_run |
bool |
False |
List files without removing |
CLI equivalent:
See Also¶
- Schema Evolution — Add, rename, drop columns
- Partition Evolution — Modify partition layout
- Time Travel — Query historical snapshots
- CDC — Change data capture streaming