Python API Reference¶
Module-by-module reference for the drls Python package.
drls¶
Top-level public API.
init_spark()¶
def init_spark(
app_name: str,
num_executors: int,
executor_cores: int,
executor_memory: str | int,
enable_hive: bool = False,
fault_tolerant_mode: bool = True,
placement_group_strategy: str | None = None,
placement_group: PlacementGroup | None = None,
placement_group_bundle_indexes: list[int] | None = None,
iceberg_catalog: str | None = None,
iceberg_warehouse: str | None = None,
iceberg_catalog_name: str = "drls",
iceberg_catalog_uri: str | None = None,
iceberg_catalog_props: dict[str, str] | None = None,
configs: dict[str, str] | None = None,
) -> SparkSession
Initialize a Spark cluster with Ray backend. Returns an active SparkSession.
stop_spark()¶
Stop the Spark cluster and clean up Ray resources.
spark_dataframe_to_ray_dataset()¶
def spark_dataframe_to_ray_dataset(
df: pyspark.sql.DataFrame,
parallelism: int | None = None,
owner: PartitionObjectsOwner | None = None,
) -> ray.data.Dataset
Convert a Spark DataFrame to a Ray Dataset.
ray_dataset_to_spark_dataframe()¶
def ray_dataset_to_spark_dataframe(
spark: SparkSession,
arrow_schema,
blocks: list[ObjectRef],
locations=None,
) -> pyspark.sql.DataFrame
Convert a Ray Dataset to a Spark DataFrame with locality awareness.
drls.iceberg.catalog¶
IcebergCatalogConfig¶
@dataclass
class IcebergCatalogConfig:
catalog_type: str # hadoop, hive, rest, polaris, glue, nessie
warehouse: str
catalog_name: str = "drls"
uri: str | None = None
extra_props: dict[str, str] = field(default_factory=dict)
configure_iceberg()¶
Build Spark configuration dict for Iceberg catalog.
drls.iceberg.table_ops¶
compact_table()¶
def compact_table(
spark: SparkSession,
table: str,
strategy: str = "binpack",
options: dict[str, str] | None = None,
) -> dict[str, Any]
expire_snapshots()¶
def expire_snapshots(
spark: SparkSession,
table: str,
retain_last: int = 5,
older_than: str | None = None,
) -> dict[str, Any]
remove_orphan_files()¶
def remove_orphan_files(
spark: SparkSession,
table: str,
older_than: str | None = None,
dry_run: bool = False,
) -> dict[str, Any]
get_table_health()¶
drls.iceberg.schema_evolution¶
add_column()¶
def add_column(
spark: SparkSession,
table: str,
name: str,
type: str,
after: str | None = None,
comment: str | None = None,
) -> dict[str, Any]
rename_column()¶
def rename_column(
spark: SparkSession,
table: str,
old_name: str,
new_name: str,
) -> dict[str, Any]
drop_column()¶
alter_column_type()¶
def alter_column_type(
spark: SparkSession,
table: str,
name: str,
new_type: str,
) -> dict[str, Any]
get_schema()¶
drls.iceberg.partition_evolution¶
add_partition_field()¶
def add_partition_field(
spark: SparkSession,
table: str,
field: str,
transform: str | None = None,
) -> dict[str, Any]
drop_partition_field()¶
def drop_partition_field(
spark: SparkSession,
table: str,
field: str,
transform: str | None = None,
) -> dict[str, Any]
get_partition_spec()¶
drls.iceberg.time_travel¶
snapshot_at()¶
def snapshot_at(
spark: SparkSession,
table: str,
snapshot_id: int | None = None,
timestamp: str | None = None,
) -> pyspark.sql.DataFrame
history()¶
rollback_to()¶
drls.iceberg.cdc¶
read_cdc_stream()¶
def read_cdc_stream(
spark: SparkSession,
table: str,
start_snapshot: int | None = None,
) -> pyspark.sql.DataFrame
cdc_to_ray()¶
def cdc_to_ray(
spark: SparkSession,
table: str,
start_snapshot: int | None = None,
**kwargs: Any,
) -> tuple[Any, StreamingQuery]
drls.streaming¶
from_spark_streaming()¶
def from_spark_streaming(
streaming_df,
stream_id=None,
max_buffered_batches=64,
max_buffered_bytes=2 * 1024**3,
trigger=None,
checkpoint_location=None,
use_jvm_sink=False,
) -> tuple[StreamingIterator, StreamingQueryWrapper]
to_spark_streaming()¶
def to_spark_streaming(
source: Iterator[pa.Table] | Callable[[], pa.Table | None],
spark,
stream_id: str = None,
max_buffered_batches: int = 64,
max_buffered_bytes: int = 2 * 1024**3,
) -> tuple[DataFrame, _ReverseStreamHandle]
create_partitioned_iterators()¶
def create_partitioned_iterators(
coordinator,
num_consumers: int,
num_partitions: int,
) -> list[StreamingIterator]
drls.agentic.agent¶
TRex¶
class TRex:
def __init__(
self,
spark: SparkSession,
provider: str = "ollama",
model: str = "llama3:70b",
api_base: str | None = None,
api_key: str | None = None,
)
def run(self, prompt: str) -> dict[str, Any]
drls.agentic.tools¶
get_tool_names()¶
get_tool_definition()¶
TOOL_DEFINITIONS¶
List of 13 OpenAI-format tool definitions. See Tool Definitions Reference.