ETL Pipelines — Practical
ETL / Data Pipelines — Practical patterns
Section titled “ETL / Data Pipelines — Practical patterns”dbt model
Section titled “dbt model”-- models/staging/stg_orders.sql{{ config(materialized='view') }}
with src as ( select * from {{ source('raw', 'orders') }})select id::bigint as order_id, user_id::bigint as user_id, status as status, amount_cents::numeric as amount_cents, created_at::timestamp as created_at, updated_at::timestamp as updated_atfrom srcwhere status is not null-- models/marts/orders_summary.sql{{ config(materialized='incremental', unique_key='order_id') }}
select o.order_id, o.user_id, o.amount_cents, o.created_at, u.countryfrom {{ ref('stg_orders') }} ojoin {{ ref('stg_users') }} u using (user_id){% if is_incremental() %}where o.updated_at > (select max(created_at) from {{ this }}){% endif %}models: - name: stg_orders columns: - name: order_id tests: [not_null, unique] - name: status tests: - accepted_values: values: [pending, paid, refunded, canceled]dbt run --select stg_orders+dbt testdbt docs generate && dbt docs serveAirflow DAG
Section titled “Airflow DAG”from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.operators.bash import BashOperatorfrom datetime import datetime
with DAG('orders_etl', start_date=datetime(2026,5,1), schedule='@hourly', catchup=True, max_active_runs=1) as dag: extract = PythonOperator(task_id='extract', python_callable=lambda: ...) load = PythonOperator(task_id='load', python_callable=lambda: ...) transform = BashOperator(task_id='dbt', bash_command='dbt run --select orders+ --vars \'{ds: {{ ds }}}\'') quality = PythonOperator(task_id='quality', python_callable=lambda: ...)
extract >> load >> transform >> qualityDagster asset
Section titled “Dagster asset”from dagster import asset, AssetIn
@assetdef raw_orders(context): return extract_from_db()
@asset(ins={"raw_orders": AssetIn()})def cleaned_orders(raw_orders): return transform(raw_orders)
@asset(ins={"cleaned_orders": AssetIn()})def order_metrics(cleaned_orders): return aggregate(cleaned_orders)Spark batch (Pyspark)
Section titled “Spark batch (Pyspark)”from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, sum as _sum
spark = SparkSession.builder.getOrCreate()
orders = spark.read.format('parquet').load('s3a://lake/orders/dt=2026-05-10/')users = spark.read.format('parquet').load('s3a://lake/users/')
joined = (orders.join(users, 'user_id') .filter(col('status') == 'paid') .groupBy('country') .agg(_sum('amount_cents').alias('total')))
joined.write.mode('overwrite').parquet('s3a://lake/marts/sales_by_country/dt=2026-05-10/')Flink stream (SQL)
Section titled “Flink stream (SQL)”CREATE TABLE orders_src ( id BIGINT, user_id BIGINT, status STRING, amount_cents BIGINT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ('connector' = 'kafka', ...);
CREATE TABLE sales_sink (...) WITH ('connector' = 'jdbc', ...);
INSERT INTO sales_sinkSELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) AS window_start, status, COUNT(*) AS cnt, SUM(amount_cents) AS totalFROM orders_srcGROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), status;Debezium PG → Kafka
Section titled “Debezium PG → Kafka”{ "name": "pg-cdc", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "pg.svc", "database.port": "5432", "database.user": "debezium", "database.password": "${DEBEZIUM_PASSWORD}", "database.dbname": "app", "table.include.list": "public.orders,public.users", "plugin.name": "pgoutput", "snapshot.mode": "initial", "topic.prefix": "cdc.app" }}Idempotent upsert (Snowflake)
Section titled “Idempotent upsert (Snowflake)”MERGE INTO target tUSING source s ON t.id = s.idWHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET status = s.status, updated_at = s.updated_atWHEN NOT MATCHED THEN INSERT (id, status, updated_at) VALUES (s.id, s.status, s.updated_at);Great Expectations check
Section titled “Great Expectations check”expectations: - expect_column_values_to_not_be_null: { column: order_id } - expect_column_values_to_be_unique: { column: order_id } - expect_column_values_to_be_in_set: column: status value_set: [pending, paid, refunded, canceled] - expect_table_row_count_to_be_between: { min_value: 1000, max_value: 1000000 }Partitioning convention
Section titled “Partitioning convention”s3://lake/raw/orders/dt=2026-05-10/region=eu/ *.parquets3://lake/marts/sales_by_country/dt=2026-05-10/In Iceberg / Delta / Hudi, partition is metadata-driven and evolvable.
Tools quick map
Section titled “Tools quick map”- Ingestion: Fivetran, Airbyte, Debezium, Kafka Connect.
- Lake formats: Iceberg, Delta Lake, Hudi.
- Engines: Spark, Trino, Presto, DuckDB, Polars.
- Streaming: Kafka Streams, Flink, Spark Structured Streaming, Materialize.
- Warehouse: Snowflake, BigQuery, Redshift, Databricks, ClickHouse.
- Transform: dbt, SQLMesh.
- Orchestrator: Airflow, Dagster, Prefect, Temporal, Argo.
- Quality: Great Expectations, Soda, dbt tests, Monte Carlo.
- Catalog: DataHub, Atlan, OpenLineage, Marquez, Unity Catalog.