Skip to content

Python API Reference

Module-by-module reference for the drls Python package.

drls

Top-level public API.

init_spark()

def init_spark(
    app_name: str,
    num_executors: int,
    executor_cores: int,
    executor_memory: str | int,
    enable_hive: bool = False,
    fault_tolerant_mode: bool = True,
    placement_group_strategy: str | None = None,
    placement_group: PlacementGroup | None = None,
    placement_group_bundle_indexes: list[int] | None = None,
    iceberg_catalog: str | None = None,
    iceberg_warehouse: str | None = None,
    iceberg_catalog_name: str = "drls",
    iceberg_catalog_uri: str | None = None,
    iceberg_catalog_props: dict[str, str] | None = None,
    configs: dict[str, str] | None = None,
) -> SparkSession

Initialize a Spark cluster with Ray backend. Returns an active SparkSession.

stop_spark()

def stop_spark(cleanup_data: bool = True) -> None

Stop the Spark cluster and clean up Ray resources.

spark_dataframe_to_ray_dataset()

def spark_dataframe_to_ray_dataset(
    df: pyspark.sql.DataFrame,
    parallelism: int | None = None,
    owner: PartitionObjectsOwner | None = None,
) -> ray.data.Dataset

Convert a Spark DataFrame to a Ray Dataset.

ray_dataset_to_spark_dataframe()

def ray_dataset_to_spark_dataframe(
    spark: SparkSession,
    arrow_schema,
    blocks: list[ObjectRef],
    locations=None,
) -> pyspark.sql.DataFrame

Convert a Ray Dataset to a Spark DataFrame with locality awareness.


drls.iceberg.catalog

IcebergCatalogConfig

@dataclass
class IcebergCatalogConfig:
    catalog_type: str           # hadoop, hive, rest, polaris, glue, nessie
    warehouse: str
    catalog_name: str = "drls"
    uri: str | None = None
    extra_props: dict[str, str] = field(default_factory=dict)

configure_iceberg()

def configure_iceberg(config: IcebergCatalogConfig) -> dict[str, str]

Build Spark configuration dict for Iceberg catalog.


drls.iceberg.table_ops

compact_table()

def compact_table(
    spark: SparkSession,
    table: str,
    strategy: str = "binpack",
    options: dict[str, str] | None = None,
) -> dict[str, Any]

expire_snapshots()

def expire_snapshots(
    spark: SparkSession,
    table: str,
    retain_last: int = 5,
    older_than: str | None = None,
) -> dict[str, Any]

remove_orphan_files()

def remove_orphan_files(
    spark: SparkSession,
    table: str,
    older_than: str | None = None,
    dry_run: bool = False,
) -> dict[str, Any]

get_table_health()

def get_table_health(
    spark: SparkSession,
    table: str,
) -> dict[str, Any]

drls.iceberg.schema_evolution

add_column()

def add_column(
    spark: SparkSession,
    table: str,
    name: str,
    type: str,
    after: str | None = None,
    comment: str | None = None,
) -> dict[str, Any]

rename_column()

def rename_column(
    spark: SparkSession,
    table: str,
    old_name: str,
    new_name: str,
) -> dict[str, Any]

drop_column()

def drop_column(
    spark: SparkSession,
    table: str,
    name: str,
) -> dict[str, Any]

alter_column_type()

def alter_column_type(
    spark: SparkSession,
    table: str,
    name: str,
    new_type: str,
) -> dict[str, Any]

get_schema()

def get_schema(
    spark: SparkSession,
    table: str,
) -> dict[str, Any]

drls.iceberg.partition_evolution

add_partition_field()

def add_partition_field(
    spark: SparkSession,
    table: str,
    field: str,
    transform: str | None = None,
) -> dict[str, Any]

drop_partition_field()

def drop_partition_field(
    spark: SparkSession,
    table: str,
    field: str,
    transform: str | None = None,
) -> dict[str, Any]

get_partition_spec()

def get_partition_spec(
    spark: SparkSession,
    table: str,
) -> dict[str, Any]

drls.iceberg.time_travel

snapshot_at()

def snapshot_at(
    spark: SparkSession,
    table: str,
    snapshot_id: int | None = None,
    timestamp: str | None = None,
) -> pyspark.sql.DataFrame

history()

def history(
    spark: SparkSession,
    table: str,
) -> dict[str, Any]

rollback_to()

def rollback_to(
    spark: SparkSession,
    table: str,
    snapshot_id: int,
) -> dict[str, Any]

drls.iceberg.cdc

read_cdc_stream()

def read_cdc_stream(
    spark: SparkSession,
    table: str,
    start_snapshot: int | None = None,
) -> pyspark.sql.DataFrame

cdc_to_ray()

def cdc_to_ray(
    spark: SparkSession,
    table: str,
    start_snapshot: int | None = None,
    **kwargs: Any,
) -> tuple[Any, StreamingQuery]

drls.streaming

from_spark_streaming()

def from_spark_streaming(
    streaming_df,
    stream_id=None,
    max_buffered_batches=64,
    max_buffered_bytes=2 * 1024**3,
    trigger=None,
    checkpoint_location=None,
    use_jvm_sink=False,
) -> tuple[StreamingIterator, StreamingQueryWrapper]

to_spark_streaming()

def to_spark_streaming(
    source: Iterator[pa.Table] | Callable[[], pa.Table | None],
    spark,
    stream_id: str = None,
    max_buffered_batches: int = 64,
    max_buffered_bytes: int = 2 * 1024**3,
) -> tuple[DataFrame, _ReverseStreamHandle]

create_partitioned_iterators()

def create_partitioned_iterators(
    coordinator,
    num_consumers: int,
    num_partitions: int,
) -> list[StreamingIterator]

drls.agentic.agent

TRex

class TRex:
    def __init__(
        self,
        spark: SparkSession,
        provider: str = "ollama",
        model: str = "llama3:70b",
        api_base: str | None = None,
        api_key: str | None = None,
    )

    def run(self, prompt: str) -> dict[str, Any]

drls.agentic.tools

get_tool_names()

def get_tool_names() -> list[str]

get_tool_definition()

def get_tool_definition(name: str) -> dict[str, Any] | None

TOOL_DEFINITIONS

List of 13 OpenAI-format tool definitions. See Tool Definitions Reference.