UDF Registry¶
The UDF Registry lets you author, test, version, and publish Python user-defined functions (UDFs) directly from the management console. Published UDFs are automatically registered with Spark at pipeline runtime, so they can be used in SQL expressions and pipeline transforms.
Writing a Spark-Compatible UDF¶
DRLS UDFs must follow Spark's calling convention: positional arguments matching DataFrame column names. Spark calls your function once per row, passing each column value as a separate positional argument.
Basic Structure¶
Multi-Column UDFs¶
When your UDF operates on multiple columns, define one parameter per column. Spark passes values in parameter-declaration order, so parameter names must match the DataFrame column names you intend to use:
def full_name(first_name: str, last_name: str) -> str:
"""Combine first and last name columns."""
return f"{first_name} {last_name}"
Use positional parameters only
Spark always passes column values as positional arguments — never as keyword arguments. Avoid *args or **kwargs in your function signature. The test runner mirrors this behavior, so a UDF that passes testing will work identically when registered with Spark.
Rules for Spark Compatibility¶
- One function per UDF — The source code is
exec()'d and the function matching the UDF name is extracted. - Pure Python — UDFs run inside the Spark executor JVM via Py4J. You can import standard library modules, but heavy dependencies (NumPy, Pandas) must be available on all executor nodes.
- Positional parameters — Each parameter maps to a DataFrame column. Use clear, descriptive names that match your column names.
- Return a single value — Spark UDFs return one value per row. The return type must match what you declare in the registry.
- No side effects — UDFs may be retried or executed out of order. Avoid writing files, making network calls, or modifying global state.
Supported Return Types¶
The UDF Registry accepts common type names and automatically maps them to valid Spark DDL type strings during registration:
| Registry Value | Spark DDL Type | Description |
|---|---|---|
string |
string |
Text/varchar |
int |
integer |
32-bit integer |
integer |
integer |
32-bit integer |
long |
bigint |
64-bit integer |
bigint |
bigint |
64-bit integer |
double |
double |
64-bit floating point |
float |
float |
32-bit floating point |
boolean |
boolean |
True/false |
short |
smallint |
16-bit integer |
byte |
tinyint |
8-bit integer |
binary |
binary |
Raw bytes |
date |
date |
Calendar date |
timestamp |
timestamp |
Date + time |
Type mapping is case-insensitive
You can enter int, Int, or INT — the registry normalizes to lowercase before mapping. If you enter a type not in the table (e.g., array<string>), it is passed through to Spark as-is.
Creating a UDF¶
- Navigate to UDF Registry in the sidebar.
- Click New UDF.
- Fill in the metadata:
- Name — A unique identifier (lowercase, used as the Spark function name).
- Description — What the UDF does.
- Function Name — The Python function name in your source code (defaults to the UDF name).
- Return Type — Select from the supported types above.
- Tags — Optional comma-separated labels for filtering.
-
Write your Python code in the Monaco editor. A starter template is provided:
-
Click Save to create the UDF with an initial draft version.
Testing a UDF¶
The built-in test runner executes your UDF against sample data using the same calling convention as Spark — positional arguments extracted from row objects in parameter-declaration order.
Step-by-Step¶
- Open your UDF in the editor.
- Expand the Test panel at the bottom.
-
Enter sample input as a JSON array. Each element represents one row:
-
Click Run Test.
- Review the results:
- Green — All rows processed successfully. Output values are displayed.
- Red — An error occurred. The full Python traceback is shown.
- Duration — Execution time in milliseconds.
10-second timeout
Tests are limited to 10 seconds. If your UDF takes longer (e.g., heavy computation or an infinite loop), the test is terminated and an error is returned.
How Testing Mirrors Spark¶
The test runner replicates Spark's UDF dispatch:
| Input Format | How Arguments Are Passed |
|---|---|
Dict row {"a": 1, "b": 2} |
Extracts values in parameter-declaration order via inspect.signature(), then calls func(row["a"], row["b"]) |
Array row [1, 2] |
Passed as positional: func(1, 2) |
Scalar "hello" |
Single argument: func("hello") |
This ensures that if a UDF passes testing, it will work when registered with Spark. The key guarantee is that dict rows use parameter names to determine argument order — not dict insertion order — which matches how Spark maps DataFrame columns to UDF parameters.
Version Lifecycle¶
Each UDF can have multiple versions, following a managed lifecycle:
graph LR
draft["Draft"] -->|publish| published["Published"]
published -->|deprecate| deprecated["Deprecated"]
Draft¶
- Editable — you can update the source code and re-test.
- Not available for use in pipelines.
- Created automatically when you save a new UDF or create a new version.
Published¶
- Immutable — source code is locked.
- Available for use in pipeline builder
udfTransformnodes. - Registered with Spark at pipeline runtime.
- Only one published version is typically active (the latest).
Deprecated¶
- No longer available for new pipeline deployments.
- Existing pipelines using this version continue to work.
- Useful for phasing out old implementations.
Creating a New Version¶
- Open the UDF in the editor.
- Click New Version in the version history panel.
- The source code from the latest version is copied into a new draft.
- Edit, test, and publish when ready.
RBAC Permissions¶
| Action | Minimum Role |
|---|---|
| View UDFs and versions | Auditor |
| Create UDFs | Data Engineer |
| Edit UDF metadata | Data Engineer |
| Create new versions | Data Engineer |
| Test UDFs | Data Engineer |
| Publish versions | Data Engineer |
| Deprecate versions | Lead Data Engineer |
| Delete UDFs | Lead Data Engineer |
Delete restrictions
A UDF can only be deleted if it has no published versions. Deprecate all published versions first.
Pipeline Integration (Phase 2)¶
When a pipeline is deployed, published UDFs are automatically registered with Spark:
- The pipeline operator fetches published UDF source code from the registry.
- Source code is base64-encoded and injected as environment variables (
RLS_UDF_{NAME}_SOURCE,RLS_UDF_{NAME}_RETURN_TYPE) into the SparkApplication CRD. - At Spark startup, the
register_from_env()hook scans forRLS_UDF_*variables, decodes the source, and callsspark.udf.register(). - The UDF is then available in SQL expressions:
SELECT my_udf(column_name) FROM table.
This approach requires no changes to the Spark image — UDFs are injected dynamically via environment variables.
Example: End-to-End Walkthrough¶
1. Create a UDF¶
Navigate to UDF Registry → New UDF and enter:
- Name:
mask_email - Description: Masks the local part of an email address
- Return Type:
string
Write the source code:
def mask_email(email: str) -> str:
"""Mask the local part of an email, keeping first and last char."""
if not email or "@" not in email:
return email
local, domain = email.rsplit("@", 1)
if len(local) <= 2:
masked = local[0] + "*"
else:
masked = local[0] + "*" * (len(local) - 2) + local[-1]
return f"{masked}@{domain}"
2. Test It¶
In the test panel, enter:
Click Run Test. Expected output:
3. Publish¶
Click Publish to lock the version and make it available for pipelines.
4. Use in a Pipeline (Phase 2)¶
In the pipeline builder, add a UDF Transform node and select mask_email. The node applies the UDF to the configured input column in your streaming pipeline.