Skip to content

Python API

The core Python API provides Spark session management and DataFrame conversions between Spark, Ray, and Pandas.

Session Management

init_spark()

Initialize a Spark-on-Ray cluster and return an active SparkSession.

import drls

spark = drls.init_spark(
    app_name="my-app",
    num_executors=2,
    executor_cores=1,
    executor_memory="1G",
)

Parameters:

Parameter Type Default Description
app_name str Spark application name
num_executors int Number of Spark executors (Ray actors)
executor_cores int CPU cores per executor
executor_memory str \| int Memory per executor (e.g., "500M", "2G")
enable_hive bool False Enable Hive support
fault_tolerant_mode bool True Enable fault-tolerant executor restart
placement_group_strategy str \| None None Ray placement group strategy
placement_group PlacementGroup \| None None Existing placement group
placement_group_bundle_indexes list[int] \| None None Bundle indexes for placement
iceberg_catalog str \| None None Iceberg catalog type (hadoop, hive, rest, polaris, glue, nessie)
iceberg_warehouse str \| None None Warehouse path (e.g., s3://..., /path/...)
iceberg_catalog_name str "drls" Catalog name
iceberg_catalog_uri str \| None None Catalog URI (required for hive/rest/polaris/nessie)
iceberg_catalog_props dict[str, str] \| None None Additional catalog properties
configs dict[str, str] \| None None Additional Spark configuration key-value pairs

Returns: SparkSession

stop_spark()

Stop the Spark cluster and clean up Ray resources.

drls.stop_spark(cleanup_data=True)
Parameter Type Default Description
cleanup_data bool True Clean up Ray object store data

DataFrame Conversions

Spark to Ray

ray_ds = drls.spark_dataframe_to_ray_dataset(df, parallelism=None)
Parameter Type Default Description
df pyspark.sql.DataFrame Source Spark DataFrame
parallelism int \| None None Target parallelism (defaults to Spark partitions)

Returns: ray.data.Dataset

Ray to Spark

df = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
Parameter Type Description
spark SparkSession Active Spark session
ray_ds ray.data.Dataset Source Ray Dataset

Returns: pyspark.sql.DataFrame

Example

import ray
import drls

ray.init()

# Initialize with Iceberg
spark = drls.init_spark(
    "demo", 2, 1, "1G",
    iceberg_catalog="hadoop",
    iceberg_warehouse="/tmp/warehouse",
)

# Use standard PySpark
df = spark.range(1000).toDF("id")
df = df.withColumn("squared", df["id"] ** 2)

# Convert to Ray Dataset for ML
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(ray_ds.count())  # 1000

# Convert back to Spark
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
df2.show(5)

# Clean up
drls.stop_spark()
ray.shutdown()