dbt Transforms¶
DRLS integrates dbt as a first-class pipeline node type, enabling SQL-native transformations alongside Spark and Ray in the same visual DAG. dbt models run as Kubernetes Jobs with project files delivered via ConfigMap — no git clone required.
Architecture¶
graph LR
subgraph "Pipeline Builder"
IS["Iceberg Source"] --> DBT["dbt Node"]
DBT --> IK["Iceberg Sink"]
end
subgraph "K8s Execution"
CM["ConfigMap<br/>(dbt project files)"] --> Job["K8s Job<br/>(dbt container)"]
Secret["K8s Secret<br/>(profiles.yml)"] --> Job
Job -->|dbt build| STS["Spark Thrift Server"]
STS --> Polaris["Polaris REST Catalog"]
Polaris --> Iceberg["Iceberg Tables"]
end
How It Works¶
- Compile time — The pipeline operator embeds the entire
dbt/project directory (models, macros,dbt_project.yml) into its binary - Deploy time — When a pipeline run reaches a dbt step, the operator creates a ConfigMap containing the project files
- Init container — A lightweight
busyboxinit container expands the flat ConfigMap into a proper directory tree at/opt/drls/dbt/ - Main container — The DRLS dbt image runs the specified command (
dbt build,dbt run,dbt test, etc.) against the mounted project - Credentials —
profiles.ymlis mounted from a Kubernetes Secret at/etc/dbt/
This follows the same ConfigMap delivery pattern used by Spark steps for entrypoints and UDF source files.
Pipeline Builder¶
Adding a dbt Node¶
Drag the dbt node (orange, GitBranch icon) from the node palette onto the canvas.
Configuration¶
| Field | Description | Example |
|---|---|---|
| Command | dbt subcommand to run | build, run, test, seed, snapshot, compile |
| Selector | --select model selector |
stg_orders stg_customers, tag:ecommerce, fct_daily_revenue+ |
| Target | --target dbt target name |
prod, dev |
| Threads | --threads concurrency |
4 |
| Full Refresh | Adds --full-refresh flag |
Checkbox |
| Fail Fast | Adds --fail-fast flag |
Checkbox |
| Profiles Secret | K8s Secret containing profiles.yml |
dbt-profiles |
Selector Syntax¶
dbt selectors determine which models to run. Common patterns:
# Run specific models
dbt run --select stg_orders stg_customers
# Run all models with a tag
dbt build --select tag:ecommerce
# Run a model and all downstream dependencies
dbt run --select stg_orders+
# Run everything in a directory
dbt run --select models/ecommerce/marts/
dbt Project Structure¶
Models live in the dbt/ directory at the repo root:
dbt/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── ecommerce/ # E-commerce analytics
│ │ ├── staging/
│ │ │ ├── sources.yml # Iceberg table source definitions
│ │ │ ├── stg_orders.sql # Cleaned orders view
│ │ │ └── stg_customers.sql # Cleaned customers view
│ │ ├── marts/
│ │ │ ├── fct_daily_revenue.sql # Daily revenue (incremental)
│ │ │ └── dim_customer_metrics.sql # Customer RFM segmentation
│ │ └── schema.yml # Model docs + tests
│ ├── cost/ # AWS cost attribution
│ │ ├── aws_cur_base.sql
│ │ ├── eks_namespace_costs.sql
│ │ └── ...
│ └── staging/
│ └── sources.yml # Billing source definitions
└── macros/ # Reusable SQL macros
Sources¶
dbt sources map to existing Iceberg tables managed through the Polaris catalog:
# dbt/models/ecommerce/staging/sources.yml
version: 2
sources:
- name: demo
schema: demo
tables:
- name: orders
description: "E-commerce orders with status, amounts, and shipping info"
columns:
- name: order_id
tests: [not_null, unique]
- name: customer_id
tests: [not_null]
- name: customers
description: "Customer profiles with lifetime value and activity tracking"
Staging Models¶
Staging models clean and normalize raw data. They're materialized as views (no storage cost):
-- dbt/models/ecommerce/staging/stg_orders.sql
{{ config(materialized='view', tags=['ecommerce']) }}
select
order_id,
customer_id,
cast(order_date as date) as order_date,
lower(status) as status,
total_amount,
items_count,
lower(shipping_country) as shipping_country,
lower(payment_method) as payment_method,
created_at
from {{ source('demo', 'orders') }}
where status != 'cancelled'
and total_amount > 0
Mart Models¶
Mart models produce business-ready tables. Use incremental materialization for large datasets:
-- dbt/models/ecommerce/marts/fct_daily_revenue.sql
{{ config(materialized='incremental', unique_key=['order_date'], tags=['ecommerce']) }}
select
o.order_date,
count(distinct o.order_id) as total_orders,
count(distinct o.customer_id) as unique_customers,
sum(o.total_amount) as gross_revenue,
avg(o.total_amount) as avg_order_value,
sum(o.items_count) as total_items
from {{ ref('stg_orders') }} o
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1
Data Quality Tests¶
Define tests in schema.yml — they run as a dbt step in the pipeline:
models:
- name: fct_daily_revenue
columns:
- name: order_date
tests: [not_null, unique]
- name: gross_revenue
tests: [not_null]
- name: total_orders
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 1
- name: dim_customer_metrics
columns:
- name: customer_tier
tests:
- accepted_values:
values: ['vip', 'regular', 'new', 'inactive']
Profiles & Credentials¶
dbt connects to Iceberg tables through a Spark Thrift Server that uses the Polaris REST catalog.
Spark Target (Production)¶
# profiles.yml
drls:
target: prod
outputs:
prod:
type: spark
method: thrift
host: "{{ env_var('DBT_SPARK_HOST', 'localhost') }}"
port: 10000
schema: billing
connect_retries: 3
connect_timeout: 30
DuckDB Target (Local Dev)¶
dev:
type: duckdb
path: "{{ env_var('DBT_DUCKDB_PATH', 'target/drls_dev.duckdb') }}"
schema: billing
Kubernetes Secret¶
Store credentials as a K8s Secret referenced by the dbt pipeline node:
kubectl create secret generic dbt-profiles \
--from-file=profiles.yml=dbt/profiles.yml \
-n default-drls-clusters
Custom Container¶
The DRLS dbt image (docker/dbt.Dockerfile) includes:
- Python 3.12 base
- dbt-spark (PyHive adapter) — connects to Spark Thrift Server
- dbt-postgres — for metadata/reporting databases
- PyIceberg — direct Polaris REST catalog access
- drls Python package — custom offline store, utilities, macros
FROM python:3.12-slim
RUN pip install --no-cache-dir \
dbt-spark[PyHive]==1.9.* \
dbt-postgres==1.9.* \
pyiceberg[rest,s3fs]>=0.11
# Install drls Python package
ARG DRLS_WHEEL
COPY ${DRLS_WHEEL:-dist/drls-*.whl} /tmp/
RUN pip install --no-cache-dir --no-deps /tmp/drls-*.whl 2>/dev/null || true
ENV DBT_PROJECT_DIR=/opt/drls/dbt \
DBT_PROFILES_DIR=/etc/dbt
WORKDIR /opt/drls/dbt
ENTRYPOINT ["dbt"]
Example Pipelines¶
E-Commerce Analytics (Pure dbt)¶
A medallion architecture pipeline transforming demo tables into analytics-ready marts:
graph LR
O["Iceberg Source<br/>demo.orders"] --> STG["dbt run<br/>stg_orders + stg_customers"]
C["Iceberg Source<br/>demo.customers"] --> STG
STG --> MART["dbt build<br/>fct_daily_revenue + dim_customer_metrics"]
MART --> R["Iceberg Sink<br/>analytics.fct_daily_revenue"]
MART --> M["Iceberg Sink<br/>analytics.dim_customer_metrics"]
What it demonstrates:
- Two dbt steps in sequence (staging views → mart tables)
- Source definitions pointing to Polaris-managed Iceberg tables
- Incremental materialization for
fct_daily_revenue - RFM customer segmentation in
dim_customer_metrics
Hybrid Spark + dbt¶
Combines Spark for real-time ingestion with dbt for batch aggregation:
graph LR
K["Kafka Source<br/>order-events"] --> S["Spark Streaming<br/>Validate & Enrich"]
S --> RAW["Iceberg Sink<br/>raw.validated_orders"]
S --> DBT["dbt build<br/>tag:ecommerce"]
DBT --> A["Iceberg Sink<br/>analytics.fct_daily_revenue"]
What it demonstrates:
- Spark handles streaming ingest (validation, enrichment)
- dbt handles batch transforms (aggregation, business logic)
- Both share the same Iceberg catalog via Polaris
- DAG edges enforce execution order between node types
dbt Data Quality Suite¶
A single-step pipeline for scheduled data quality checks:
graph LR
T["Iceberg Source<br/>demo.*"] --> DQ["dbt test<br/>tag:ecommerce<br/>--fail-fast"]
Run dbt test on a schedule to validate data quality constraints defined in schema.yml.
Seeding Demo Pipelines¶
Load the example pipeline definitions into your DRLS instance:
# Port-forward to the backend
kubectl port-forward svc/ui-server 3000:3000 -n core-services &
# Seed demo pipelines
python scripts/seed_demo_pipelines.py
This creates three pipeline definitions visible in the Pipeline Builder UI.
Environment Variables¶
| Variable | Default | Description |
|---|---|---|
DEFAULT_DBT_IMAGE |
drls/dbt:latest |
Default container image for dbt steps |
DBT_PROFILES_DIR |
/etc/dbt |
Profiles directory inside the container |
DBT_PROJECT_DIR |
/opt/drls/dbt |
Project directory inside the container |
DBT_TARGET |
— | dbt target name (overrides profiles.yml default) |
DBT_SPARK_HOST |
localhost |
Spark Thrift Server hostname (used in profiles.yml) |
Node Type Mapping¶
| Pipeline Builder Node | CRD StepType | K8s Resource | Runner |
|---|---|---|---|
sparkStream |
SparkStreaming | Pod + Service | Spark driver (native K8s scheduler) |
rayProcess |
RayTrain | RayJob CRD | KubeRay |
dbt |
Dbt | Job + ConfigMap | dbt CLI in custom container |
custom |
Custom | Job | Any container |