Python API¶
The core Python API provides Spark session management and DataFrame conversions between Spark, Ray, and Pandas.
Session Management¶
init_spark()¶
Initialize a Spark-on-Ray cluster and return an active SparkSession.
import drls
spark = drls.init_spark(
app_name="my-app",
num_executors=2,
executor_cores=1,
executor_memory="1G",
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
app_name |
str |
— | Spark application name |
num_executors |
int |
— | Number of Spark executors (Ray actors) |
executor_cores |
int |
— | CPU cores per executor |
executor_memory |
str \| int |
— | Memory per executor (e.g., "500M", "2G") |
enable_hive |
bool |
False |
Enable Hive support |
fault_tolerant_mode |
bool |
True |
Enable fault-tolerant executor restart |
placement_group_strategy |
str \| None |
None |
Ray placement group strategy |
placement_group |
PlacementGroup \| None |
None |
Existing placement group |
placement_group_bundle_indexes |
list[int] \| None |
None |
Bundle indexes for placement |
iceberg_catalog |
str \| None |
None |
Iceberg catalog type (hadoop, hive, rest, polaris, glue, nessie) |
iceberg_warehouse |
str \| None |
None |
Warehouse path (e.g., s3://..., /path/...) |
iceberg_catalog_name |
str |
"drls" |
Catalog name |
iceberg_catalog_uri |
str \| None |
None |
Catalog URI (required for hive/rest/polaris/nessie) |
iceberg_catalog_props |
dict[str, str] \| None |
None |
Additional catalog properties |
configs |
dict[str, str] \| None |
None |
Additional Spark configuration key-value pairs |
Returns: SparkSession
stop_spark()¶
Stop the Spark cluster and clean up Ray resources.
| Parameter | Type | Default | Description |
|---|---|---|---|
cleanup_data |
bool |
True |
Clean up Ray object store data |
DataFrame Conversions¶
Spark to Ray¶
| Parameter | Type | Default | Description |
|---|---|---|---|
df |
pyspark.sql.DataFrame |
— | Source Spark DataFrame |
parallelism |
int \| None |
None |
Target parallelism (defaults to Spark partitions) |
Returns: ray.data.Dataset
Ray to Spark¶
| Parameter | Type | Description |
|---|---|---|
spark |
SparkSession |
Active Spark session |
ray_ds |
ray.data.Dataset |
Source Ray Dataset |
Returns: pyspark.sql.DataFrame
Example¶
import ray
import drls
ray.init()
# Initialize with Iceberg
spark = drls.init_spark(
"demo", 2, 1, "1G",
iceberg_catalog="hadoop",
iceberg_warehouse="/tmp/warehouse",
)
# Use standard PySpark
df = spark.range(1000).toDF("id")
df = df.withColumn("squared", df["id"] ** 2)
# Convert to Ray Dataset for ML
ray_ds = drls.spark_dataframe_to_ray_dataset(df)
print(ray_ds.count()) # 1000
# Convert back to Spark
df2 = drls.ray_dataset_to_spark_dataframe(spark, ray_ds)
df2.show(5)
# Clean up
drls.stop_spark()
ray.shutdown()