Skip to content

UDF Registry

The UDF Registry lets you author, test, version, and publish Python user-defined functions (UDFs) directly from the management console. Published UDFs are automatically registered with Spark at pipeline runtime, so they can be used in SQL expressions and pipeline transforms.

Writing a Spark-Compatible UDF

DRLS UDFs must follow Spark's calling convention: positional arguments matching DataFrame column names. Spark calls your function once per row, passing each column value as a separate positional argument.

Basic Structure

def my_udf(value: str) -> str:
    """Transform a single column value."""
    return value.upper()

Multi-Column UDFs

When your UDF operates on multiple columns, define one parameter per column. Spark passes values in parameter-declaration order, so parameter names must match the DataFrame column names you intend to use:

def full_name(first_name: str, last_name: str) -> str:
    """Combine first and last name columns."""
    return f"{first_name} {last_name}"

Use positional parameters only

Spark always passes column values as positional arguments — never as keyword arguments. Avoid *args or **kwargs in your function signature. The test runner mirrors this behavior, so a UDF that passes testing will work identically when registered with Spark.

Rules for Spark Compatibility

  1. One function per UDF — The source code is exec()'d and the function matching the UDF name is extracted.
  2. Pure Python — UDFs run inside the Spark executor JVM via Py4J. You can import standard library modules, but heavy dependencies (NumPy, Pandas) must be available on all executor nodes.
  3. Positional parameters — Each parameter maps to a DataFrame column. Use clear, descriptive names that match your column names.
  4. Return a single value — Spark UDFs return one value per row. The return type must match what you declare in the registry.
  5. No side effects — UDFs may be retried or executed out of order. Avoid writing files, making network calls, or modifying global state.

Supported Return Types

The UDF Registry accepts common type names and automatically maps them to valid Spark DDL type strings during registration:

Registry Value Spark DDL Type Description
string string Text/varchar
int integer 32-bit integer
integer integer 32-bit integer
long bigint 64-bit integer
bigint bigint 64-bit integer
double double 64-bit floating point
float float 32-bit floating point
boolean boolean True/false
short smallint 16-bit integer
byte tinyint 8-bit integer
binary binary Raw bytes
date date Calendar date
timestamp timestamp Date + time

Type mapping is case-insensitive

You can enter int, Int, or INT — the registry normalizes to lowercase before mapping. If you enter a type not in the table (e.g., array<string>), it is passed through to Spark as-is.

Creating a UDF

  1. Navigate to UDF Registry in the sidebar.
  2. Click New UDF.
  3. Fill in the metadata:
    • Name — A unique identifier (lowercase, used as the Spark function name).
    • Description — What the UDF does.
    • Function Name — The Python function name in your source code (defaults to the UDF name).
    • Return Type — Select from the supported types above.
    • Tags — Optional comma-separated labels for filtering.
  4. Write your Python code in the Monaco editor. A starter template is provided:

    def my_udf(value: str) -> str:
        """Transform a single value."""
        return value.upper()
    
  5. Click Save to create the UDF with an initial draft version.

Testing a UDF

The built-in test runner executes your UDF against sample data using the same calling convention as Spark — positional arguments extracted from row objects in parameter-declaration order.

Step-by-Step

  1. Open your UDF in the editor.
  2. Expand the Test panel at the bottom.
  3. Enter sample input as a JSON array. Each element represents one row:

    [
        {"first_name": "Alice", "last_name": "Smith"},
        {"first_name": "Bob", "last_name": "Jones"}
    ]
    
    ["hello", "world", "spark"]
    
    [
        ["Alice", "Smith"],
        ["Bob", "Jones"]
    ]
    
  4. Click Run Test.

  5. Review the results:
    • Green — All rows processed successfully. Output values are displayed.
    • Red — An error occurred. The full Python traceback is shown.
    • Duration — Execution time in milliseconds.

10-second timeout

Tests are limited to 10 seconds. If your UDF takes longer (e.g., heavy computation or an infinite loop), the test is terminated and an error is returned.

How Testing Mirrors Spark

The test runner replicates Spark's UDF dispatch:

Input Format How Arguments Are Passed
Dict row {"a": 1, "b": 2} Extracts values in parameter-declaration order via inspect.signature(), then calls func(row["a"], row["b"])
Array row [1, 2] Passed as positional: func(1, 2)
Scalar "hello" Single argument: func("hello")

This ensures that if a UDF passes testing, it will work when registered with Spark. The key guarantee is that dict rows use parameter names to determine argument order — not dict insertion order — which matches how Spark maps DataFrame columns to UDF parameters.

Version Lifecycle

Each UDF can have multiple versions, following a managed lifecycle:

graph LR
    draft["Draft"] -->|publish| published["Published"]
    published -->|deprecate| deprecated["Deprecated"]

Draft

  • Editable — you can update the source code and re-test.
  • Not available for use in pipelines.
  • Created automatically when you save a new UDF or create a new version.

Published

  • Immutable — source code is locked.
  • Available for use in pipeline builder udfTransform nodes.
  • Registered with Spark at pipeline runtime.
  • Only one published version is typically active (the latest).

Deprecated

  • No longer available for new pipeline deployments.
  • Existing pipelines using this version continue to work.
  • Useful for phasing out old implementations.

Creating a New Version

  1. Open the UDF in the editor.
  2. Click New Version in the version history panel.
  3. The source code from the latest version is copied into a new draft.
  4. Edit, test, and publish when ready.

RBAC Permissions

Action Minimum Role
View UDFs and versions Auditor
Create UDFs Data Engineer
Edit UDF metadata Data Engineer
Create new versions Data Engineer
Test UDFs Data Engineer
Publish versions Data Engineer
Deprecate versions Lead Data Engineer
Delete UDFs Lead Data Engineer

Delete restrictions

A UDF can only be deleted if it has no published versions. Deprecate all published versions first.

Pipeline Integration (Phase 2)

When a pipeline is deployed, published UDFs are automatically registered with Spark:

  1. The pipeline operator fetches published UDF source code from the registry.
  2. Source code is base64-encoded and injected as environment variables (RLS_UDF_{NAME}_SOURCE, RLS_UDF_{NAME}_RETURN_TYPE) into the SparkApplication CRD.
  3. At Spark startup, the register_from_env() hook scans for RLS_UDF_* variables, decodes the source, and calls spark.udf.register().
  4. The UDF is then available in SQL expressions: SELECT my_udf(column_name) FROM table.

This approach requires no changes to the Spark image — UDFs are injected dynamically via environment variables.

Example: End-to-End Walkthrough

1. Create a UDF

Navigate to UDF RegistryNew UDF and enter:

  • Name: mask_email
  • Description: Masks the local part of an email address
  • Return Type: string

Write the source code:

def mask_email(email: str) -> str:
    """Mask the local part of an email, keeping first and last char."""
    if not email or "@" not in email:
        return email
    local, domain = email.rsplit("@", 1)
    if len(local) <= 2:
        masked = local[0] + "*"
    else:
        masked = local[0] + "*" * (len(local) - 2) + local[-1]
    return f"{masked}@{domain}"

2. Test It

In the test panel, enter:

[
    {"email": "alice.smith@example.com"},
    {"email": "bob@example.com"},
    {"email": "ab@example.com"}
]

Click Run Test. Expected output:

["a*********h@example.com", "b*b@example.com", "a*@example.com"]

3. Publish

Click Publish to lock the version and make it available for pipelines.

4. Use in a Pipeline (Phase 2)

In the pipeline builder, add a UDF Transform node and select mask_email. The node applies the UDF to the configured input column in your streaming pipeline.