Skip to content

ETL Pipelines — Practical

ETL / Data Pipelines — Practical patterns

Section titled “ETL / Data Pipelines — Practical patterns”
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
with src as (
select * from {{ source('raw', 'orders') }}
)
select
id::bigint as order_id,
user_id::bigint as user_id,
status as status,
amount_cents::numeric as amount_cents,
created_at::timestamp as created_at,
updated_at::timestamp as updated_at
from src
where status is not null
-- models/marts/orders_summary.sql
{{ config(materialized='incremental', unique_key='order_id') }}
select o.order_id, o.user_id, o.amount_cents, o.created_at, u.country
from {{ ref('stg_orders') }} o
join {{ ref('stg_users') }} u using (user_id)
{% if is_incremental() %}
where o.updated_at > (select max(created_at) from {{ this }})
{% endif %}
models/schema.yml
models:
- name: stg_orders
columns:
- name: order_id
tests: [not_null, unique]
- name: status
tests:
- accepted_values:
values: [pending, paid, refunded, canceled]
Terminal window
dbt run --select stg_orders+
dbt test
dbt docs generate && dbt docs serve
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('orders_etl',
start_date=datetime(2026,5,1),
schedule='@hourly',
catchup=True,
max_active_runs=1) as dag:
extract = PythonOperator(task_id='extract', python_callable=lambda: ...)
load = PythonOperator(task_id='load', python_callable=lambda: ...)
transform = BashOperator(task_id='dbt',
bash_command='dbt run --select orders+ --vars \'{ds: {{ ds }}}\'')
quality = PythonOperator(task_id='quality', python_callable=lambda: ...)
extract >> load >> transform >> quality
from dagster import asset, AssetIn
@asset
def raw_orders(context):
return extract_from_db()
@asset(ins={"raw_orders": AssetIn()})
def cleaned_orders(raw_orders):
return transform(raw_orders)
@asset(ins={"cleaned_orders": AssetIn()})
def order_metrics(cleaned_orders):
return aggregate(cleaned_orders)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
spark = SparkSession.builder.getOrCreate()
orders = spark.read.format('parquet').load('s3a://lake/orders/dt=2026-05-10/')
users = spark.read.format('parquet').load('s3a://lake/users/')
joined = (orders.join(users, 'user_id')
.filter(col('status') == 'paid')
.groupBy('country')
.agg(_sum('amount_cents').alias('total')))
joined.write.mode('overwrite').parquet('s3a://lake/marts/sales_by_country/dt=2026-05-10/')
CREATE TABLE orders_src (
id BIGINT, user_id BIGINT, status STRING, amount_cents BIGINT, ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);
CREATE TABLE sales_sink (...) WITH ('connector' = 'jdbc', ...);
INSERT INTO sales_sink
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start,
status, COUNT(*) AS cnt, SUM(amount_cents) AS total
FROM orders_src
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), status;
{
"name": "pg-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg.svc",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${DEBEZIUM_PASSWORD}",
"database.dbname": "app",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"topic.prefix": "cdc.app"
}
}
MERGE INTO target t
USING source s ON t.id = s.id
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET status = s.status, updated_at = s.updated_at
WHEN NOT MATCHED THEN
INSERT (id, status, updated_at) VALUES (s.id, s.status, s.updated_at);
expectations:
- expect_column_values_to_not_be_null: { column: order_id }
- expect_column_values_to_be_unique: { column: order_id }
- expect_column_values_to_be_in_set:
column: status
value_set: [pending, paid, refunded, canceled]
- expect_table_row_count_to_be_between: { min_value: 1000, max_value: 1000000 }
s3://lake/raw/orders/dt=2026-05-10/region=eu/ *.parquet
s3://lake/marts/sales_by_country/dt=2026-05-10/

In Iceberg / Delta / Hudi, partition is metadata-driven and evolvable.

  • Ingestion: Fivetran, Airbyte, Debezium, Kafka Connect.
  • Lake formats: Iceberg, Delta Lake, Hudi.
  • Engines: Spark, Trino, Presto, DuckDB, Polars.
  • Streaming: Kafka Streams, Flink, Spark Structured Streaming, Materialize.
  • Warehouse: Snowflake, BigQuery, Redshift, Databricks, ClickHouse.
  • Transform: dbt, SQLMesh.
  • Orchestrator: Airflow, Dagster, Prefect, Temporal, Argo.
  • Quality: Great Expectations, Soda, dbt tests, Monte Carlo.
  • Catalog: DataHub, Atlan, OpenLineage, Marquez, Unity Catalog.