Skip to content

dbt Transforms

DRLS integrates dbt as a first-class pipeline node type, enabling SQL-native transformations alongside Spark and Ray in the same visual DAG. dbt models run as Kubernetes Jobs with project files delivered via ConfigMap — no git clone required.

Architecture

graph LR
    subgraph "Pipeline Builder"
        IS["Iceberg Source"] --> DBT["dbt Node"]
        DBT --> IK["Iceberg Sink"]
    end

    subgraph "K8s Execution"
        CM["ConfigMap<br/>(dbt project files)"] --> Job["K8s Job<br/>(dbt container)"]
        Secret["K8s Secret<br/>(profiles.yml)"] --> Job
        Job -->|dbt build| STS["Spark Thrift Server"]
        STS --> Polaris["Polaris REST Catalog"]
        Polaris --> Iceberg["Iceberg Tables"]
    end

How It Works

  1. Compile time — The pipeline operator embeds the entire dbt/ project directory (models, macros, dbt_project.yml) into its binary
  2. Deploy time — When a pipeline run reaches a dbt step, the operator creates a ConfigMap containing the project files
  3. Init container — A lightweight busybox init container expands the flat ConfigMap into a proper directory tree at /opt/drls/dbt/
  4. Main container — The DRLS dbt image runs the specified command (dbt build, dbt run, dbt test, etc.) against the mounted project
  5. Credentialsprofiles.yml is mounted from a Kubernetes Secret at /etc/dbt/

This follows the same ConfigMap delivery pattern used by Spark steps for entrypoints and UDF source files.

Pipeline Builder

Adding a dbt Node

Drag the dbt node (orange, GitBranch icon) from the node palette onto the canvas.

Configuration

Field Description Example
Command dbt subcommand to run build, run, test, seed, snapshot, compile
Selector --select model selector stg_orders stg_customers, tag:ecommerce, fct_daily_revenue+
Target --target dbt target name prod, dev
Threads --threads concurrency 4
Full Refresh Adds --full-refresh flag Checkbox
Fail Fast Adds --fail-fast flag Checkbox
Profiles Secret K8s Secret containing profiles.yml dbt-profiles

Selector Syntax

dbt selectors determine which models to run. Common patterns:

# Run specific models
dbt run --select stg_orders stg_customers

# Run all models with a tag
dbt build --select tag:ecommerce

# Run a model and all downstream dependencies
dbt run --select stg_orders+

# Run everything in a directory
dbt run --select models/ecommerce/marts/

dbt Project Structure

Models live in the dbt/ directory at the repo root:

dbt/
├── dbt_project.yml
├── profiles.yml
├── models/
│   ├── ecommerce/                 # E-commerce analytics
│   │   ├── staging/
│   │   │   ├── sources.yml        # Iceberg table source definitions
│   │   │   ├── stg_orders.sql     # Cleaned orders view
│   │   │   └── stg_customers.sql  # Cleaned customers view
│   │   ├── marts/
│   │   │   ├── fct_daily_revenue.sql      # Daily revenue (incremental)
│   │   │   └── dim_customer_metrics.sql   # Customer RFM segmentation
│   │   └── schema.yml             # Model docs + tests
│   ├── cost/                      # AWS cost attribution
│   │   ├── aws_cur_base.sql
│   │   ├── eks_namespace_costs.sql
│   │   └── ...
│   └── staging/
│       └── sources.yml            # Billing source definitions
└── macros/                        # Reusable SQL macros

Sources

dbt sources map to existing Iceberg tables managed through the Polaris catalog:

# dbt/models/ecommerce/staging/sources.yml
version: 2

sources:
  - name: demo
    schema: demo
    tables:
      - name: orders
        description: "E-commerce orders with status, amounts, and shipping info"
        columns:
          - name: order_id
            tests: [not_null, unique]
          - name: customer_id
            tests: [not_null]

      - name: customers
        description: "Customer profiles with lifetime value and activity tracking"

Staging Models

Staging models clean and normalize raw data. They're materialized as views (no storage cost):

-- dbt/models/ecommerce/staging/stg_orders.sql
{{ config(materialized='view', tags=['ecommerce']) }}

select
    order_id,
    customer_id,
    cast(order_date as date) as order_date,
    lower(status) as status,
    total_amount,
    items_count,
    lower(shipping_country) as shipping_country,
    lower(payment_method) as payment_method,
    created_at
from {{ source('demo', 'orders') }}
where status != 'cancelled'
  and total_amount > 0

Mart Models

Mart models produce business-ready tables. Use incremental materialization for large datasets:

-- dbt/models/ecommerce/marts/fct_daily_revenue.sql
{{ config(materialized='incremental', unique_key=['order_date'], tags=['ecommerce']) }}

select
    o.order_date,
    count(distinct o.order_id) as total_orders,
    count(distinct o.customer_id) as unique_customers,
    sum(o.total_amount) as gross_revenue,
    avg(o.total_amount) as avg_order_value,
    sum(o.items_count) as total_items
from {{ ref('stg_orders') }} o

{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}

group by 1

Data Quality Tests

Define tests in schema.yml — they run as a dbt step in the pipeline:

models:
  - name: fct_daily_revenue
    columns:
      - name: order_date
        tests: [not_null, unique]
      - name: gross_revenue
        tests: [not_null]
      - name: total_orders
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 1

  - name: dim_customer_metrics
    columns:
      - name: customer_tier
        tests:
          - accepted_values:
              values: ['vip', 'regular', 'new', 'inactive']

Profiles & Credentials

dbt connects to Iceberg tables through a Spark Thrift Server that uses the Polaris REST catalog.

Spark Target (Production)

# profiles.yml
drls:
  target: prod
  outputs:
    prod:
      type: spark
      method: thrift
      host: "{{ env_var('DBT_SPARK_HOST', 'localhost') }}"
      port: 10000
      schema: billing
      connect_retries: 3
      connect_timeout: 30

DuckDB Target (Local Dev)

    dev:
      type: duckdb
      path: "{{ env_var('DBT_DUCKDB_PATH', 'target/drls_dev.duckdb') }}"
      schema: billing

Kubernetes Secret

Store credentials as a K8s Secret referenced by the dbt pipeline node:

kubectl create secret generic dbt-profiles \
  --from-file=profiles.yml=dbt/profiles.yml \
  -n default-drls-clusters

Custom Container

The DRLS dbt image (docker/dbt.Dockerfile) includes:

  • Python 3.12 base
  • dbt-spark (PyHive adapter) — connects to Spark Thrift Server
  • dbt-postgres — for metadata/reporting databases
  • PyIceberg — direct Polaris REST catalog access
  • drls Python package — custom offline store, utilities, macros
FROM python:3.12-slim

RUN pip install --no-cache-dir \
    dbt-spark[PyHive]==1.9.* \
    dbt-postgres==1.9.* \
    pyiceberg[rest,s3fs]>=0.11

# Install drls Python package
ARG DRLS_WHEEL
COPY ${DRLS_WHEEL:-dist/drls-*.whl} /tmp/
RUN pip install --no-cache-dir --no-deps /tmp/drls-*.whl 2>/dev/null || true

ENV DBT_PROJECT_DIR=/opt/drls/dbt \
    DBT_PROFILES_DIR=/etc/dbt
WORKDIR /opt/drls/dbt
ENTRYPOINT ["dbt"]

Example Pipelines

E-Commerce Analytics (Pure dbt)

A medallion architecture pipeline transforming demo tables into analytics-ready marts:

graph LR
    O["Iceberg Source<br/>demo.orders"] --> STG["dbt run<br/>stg_orders + stg_customers"]
    C["Iceberg Source<br/>demo.customers"] --> STG
    STG --> MART["dbt build<br/>fct_daily_revenue + dim_customer_metrics"]
    MART --> R["Iceberg Sink<br/>analytics.fct_daily_revenue"]
    MART --> M["Iceberg Sink<br/>analytics.dim_customer_metrics"]

What it demonstrates:

  • Two dbt steps in sequence (staging views → mart tables)
  • Source definitions pointing to Polaris-managed Iceberg tables
  • Incremental materialization for fct_daily_revenue
  • RFM customer segmentation in dim_customer_metrics

Hybrid Spark + dbt

Combines Spark for real-time ingestion with dbt for batch aggregation:

graph LR
    K["Kafka Source<br/>order-events"] --> S["Spark Streaming<br/>Validate & Enrich"]
    S --> RAW["Iceberg Sink<br/>raw.validated_orders"]
    S --> DBT["dbt build<br/>tag:ecommerce"]
    DBT --> A["Iceberg Sink<br/>analytics.fct_daily_revenue"]

What it demonstrates:

  • Spark handles streaming ingest (validation, enrichment)
  • dbt handles batch transforms (aggregation, business logic)
  • Both share the same Iceberg catalog via Polaris
  • DAG edges enforce execution order between node types

dbt Data Quality Suite

A single-step pipeline for scheduled data quality checks:

graph LR
    T["Iceberg Source<br/>demo.*"] --> DQ["dbt test<br/>tag:ecommerce<br/>--fail-fast"]

Run dbt test on a schedule to validate data quality constraints defined in schema.yml.

Seeding Demo Pipelines

Load the example pipeline definitions into your DRLS instance:

# Port-forward to the backend
kubectl port-forward svc/ui-server 3000:3000 -n core-services &

# Seed demo pipelines
python scripts/seed_demo_pipelines.py

This creates three pipeline definitions visible in the Pipeline Builder UI.

Environment Variables

Variable Default Description
DEFAULT_DBT_IMAGE drls/dbt:latest Default container image for dbt steps
DBT_PROFILES_DIR /etc/dbt Profiles directory inside the container
DBT_PROJECT_DIR /opt/drls/dbt Project directory inside the container
DBT_TARGET dbt target name (overrides profiles.yml default)
DBT_SPARK_HOST localhost Spark Thrift Server hostname (used in profiles.yml)

Node Type Mapping

Pipeline Builder Node CRD StepType K8s Resource Runner
sparkStream SparkStreaming Pod + Service Spark driver (native K8s scheduler)
rayProcess RayTrain RayJob CRD KubeRay
dbt Dbt Job + ConfigMap dbt CLI in custom container
custom Custom Job Any container