⏳
Loading cheatsheet...
ETL/ELT patterns, data modeling, warehouse concepts, orchestration and reliability basics.
| Aspect | ETL | ELT |
|---|---|---|
| Order | Extract → Transform → Load | Extract → Load → Transform |
| Transform location | Staging server / ETL tool | Inside the target data warehouse |
| Best for | Small-medium data, on-prem | Cloud data warehouses, big data |
| Tools | Informatica, SSIS, Talend | dbt, Snowflake, BigQuery, Databricks |
| Performance | Limited by ETL server | Leverages warehouse compute power |
| Flexibility | Pre-defined transforms | Ad-hoc, exploratory transforms |
| Latency | Often batch, scheduled | Near real-time possible |
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Data scope | Bounded (complete dataset) | Unbounded (continuous) |
| Latency | Minutes to hours | Milliseconds to seconds |
| Throughput | Very high | Variable, depends on throughput |
| Processing model | Collect → Process → Output | Event-by-event or micro-batch |
| Tools | Spark, Airflow, dbt | Kafka Streams, Flink, Spark Streaming |
| Use cases | Daily reports, ML training | Fraud detection, live dashboards |
| Fault tolerance | Rerun the batch | Checkpoint + replay |
| Complexity | Simpler | Windowing, watermarks, state management |
# The Modern Data Stack
stack = {
'ingestion': ['Fivetran', 'Airbyte', 'dlt', 'Estuary'],
'storage': ['S3', 'GCS', 'ADLS', 'Delta Lake', 'Iceberg'],
'warehouse': ['Snowflake', 'BigQuery', 'Redshift', 'Databricks'],
'transform': ['dbt', 'SQLMesh', 'Dataform'],
'orchestrate': ['Airflow', 'Dagster', 'Prefect', 'Mage'],
'observe': ['Monte Carlo', 'Great Expectations', 'dbt tests'],
'catalog': ['DataHub', 'OpenMetadata', 'Atlan'],
'stream': ['Kafka', 'Kinesis', 'Pulsar', 'Flink'],
}import pandas as pd
import requests
from sqlalchemy import create_engine
# ─── EXTRACT ───
def extract_api(url: str) -> pd.DataFrame:
"""Extract data from REST API with pagination."""
all_data = []
page = 1
while True:
resp = requests.get(url, params={'page': page})
resp.raise_for_status()
data = resp.json()
if not data['results']:
break
all_data.extend(data['results'])
page += 1
return pd.DataFrame(all_data)
def extract_db(query: str, connection_string: str) -> pd.DataFrame:
"""Extract from relational database."""
engine = create_engine(connection_string)
return pd.read_sql(query, engine)
# ─── TRANSFORM ───
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""Standard data cleaning pipeline."""
# Deduplicate
df = df.drop_duplicates(subset=['id'])
# Handle nulls
df = df.fillna({'email': 'unknown', 'score': 0})
df = df.dropna(subset=['user_id'])
# Type casting
df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Remove outliers (3 sigma)
mean, std = df['amount'].mean(), df['amount'].std()
df = df[df['amount'].between(mean - 3*std, mean + 3*std)]
return df
# ─── LOAD ───
def load_to_db(df: pd.DataFrame, table: str, conn_str: str,
if_exists='replace'):
"""Load DataFrame to database."""
engine = create_engine(conn_str)
df.to_sql(table, engine, if_exists=if_exists, index=False,
chunksize=10000) # batch insertsfrom pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('ETL').getOrCreate()
# Extract
raw = spark.read.json('s3://raw/events/2024-01-*')
# Transform
cleaned = raw \
.filter(F.col('event_type').isNotNull()) \
.withColumn('event_time', F.to_timestamp('timestamp')) \
.withColumn('date', F.to_date('event_time')) \
.filter(F.col('event_time').isNotNull()) \
.dropDuplicates(['event_id'])
aggregated = cleaned \
.groupBy('date', 'event_type', 'country') \
.agg(
F.count('*').alias('event_count'),
F.avg('value').alias('avg_value'),
F.sum('revenue').alias('total_revenue'),
)
# Load
aggregated.write \
.partitionBy('date') \
.mode('overwrite') \
.parquet('s3://processed/aggregated/')import great_expectations as gx
# Define expectations (data quality rules)
context = gx.get_context()
validator = context.sources.pandas_default.read_dataframe(df)
validator.expect_column_values_to_not_be_null('user_id')
validator.expect_column_values_to_be_between('age', min_value=0, max_value=150)
validator.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending'])
validator.expect_column_values_to_match_regex('email', r'^[\w.-]+@[\w.-]+\.\w+$')
validator.expect_column_values_to_be_unique('id')
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
validator.expect_column_value_lengths_to_be_between('phone', 10, 15)
# Run validation
result = validator.validate()
if not result.success:
alert_team(result)updated_at timestamps or watermark columns to avoid reprocessing the entire dataset on every run.from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.utils.dates import days_ago
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id='daily_sales_etl',
default_args=default_args,
description='Daily sales data ETL pipeline',
schedule_interval='0 6 * * *', # 6 AM daily (cron)
start_date=days_ago(1),
catchup=False, # skip historical runs
max_active_runs=1,
tags=['etl', 'sales', 'production'],
) as dag:
extract = PythonOperator(
task_id='extract_sales',
python_callable=extract_sales_data,
op_kwargs={'date': '{{ ds }}', # execution date YYYY-MM-DD
'source': 'postgres'},
)
clean = PythonOperator(
task_id='clean_data',
python_callable=clean_sales_data,
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_snowflake,
)
validate = PythonOperator(
task_id='validate_output',
python_callable=run_data_quality_checks,
)
# Pipeline dependency
extract >> clean >> load >> validate# Branching
from airflow.operators.python import BranchPythonOperator
def branch_logic(**context):
if context['ti'].xcom_pull(task_ids='check_data')['count'] > 0:
return 'process_data'
return 'skip_processing'
branch = BranchPythonOperator(
task_id='branch',
python_callable=branch_logic,
)
process = PythonOperator(task_id='process_data', python_callable=process)
skip = EmptyOperator(task_id='skip_processing')
branch >> [process, skip]
# Trigger rules (when to run downstream)
downstream = PythonOperator(
task_id='always_run',
python_callable=my_func,
trigger_rule='all_done', # options: all_success, all_done,
) # one_success, none_failed, one_failed
# Dynamic task mapping (Airflow 2.3+)
expand = PythonOperator.partial(
task_id='process_file',
python_callable=process_single_file,
).expand(filename=get_file_list())
# Sensors — wait for external conditions
from airflow.providers.sftp.sensors.sftp import SFTPSensor
wait_for_file = SFTPSensor(
task_id='wait_for_file',
path='/upload/daily_sales.csv',
sftp_conn_id='sftp_default',
timeout=60 * 60, # 1 hour
poke_interval=300, # check every 5 min
mode='poke', # 'poke' or 'reschedule'
)| Concept | Description | Key Detail |
|---|---|---|
| DAG | Directed Acyclic Graph | Defines task dependencies |
| Task | Unit of work | Operator instance in a DAG |
| Operator | Template for a task | PythonOperator, BashOperator, etc. |
| Sensor | Waits for condition | External file, DB row, API status |
| Hook | Connection interface | Reusable connection to external systems |
| XCom | Cross-task communication | Push/pull small data between tasks (< 48KB) |
| Connection | External system config | URI + credentials stored in Airflow DB |
| Variable | Key-value store | Runtime configuration, Jinja-accessible |
| Pool | Resource slot manager | Limit parallel tasks on a resource |
| Trigger Rule | Downstream condition | all_success (default), all_done, one_success |
s3:// or gcs:// as an intermediary and pass only the file path as an XCom.# ─── Star Schema ───
# Central fact table + surrounding dimension tables
# Fact table: one row per event/transaction
# fact_sales (event_id, date_key, product_key, store_key, customer_key,
# units_sold, revenue, cost)
# Dimension tables: descriptive attributes
# dim_date (date_key, date, day_of_week, month, quarter, year, is_holiday)
# dim_product (product_key, name, category, subcategory, brand)
# dim_store (store_key, name, region, city, country, sq_ft)
# dim_customer (customer_key, name, segment, age_group, loyalty_tier)# ─── Slowly Changing Dimensions (SCD) ───
# Type 1: Overwrite — no history
# UPDATE dim_customer SET segment = 'Enterprise' WHERE key = 42;
# Type 2: Add new row — full history
# INSERT INTO dim_customer (key, name, segment, valid_from, valid_to, is_current)
# VALUES (43, 'Acme Corp', 'Enterprise', '2024-01-15', '9999-12-31', true)
# UPDATE dim_customer SET valid_to = '2024-01-14', is_current = false
# WHERE key = 42;
# Type 3: Add column — partial history
# ALTER TABLE dim_customer ADD COLUMN prev_segment;
# UPDATE dim_customer SET prev_segment = segment WHERE key = 42;
# UPDATE dim_customer SET segment = 'Enterprise' WHERE key = 42;
# dbt SCD Type 2 macro
# {{ dbt_utils.surrogate_key(['customer_id', 'valid_from']) }}# ─── dbt Models — Layered Architecture ───
# models/staging/stg_orders.sql
# Clean and standardize raw source data
SELECT
order_id,
customer_id,
order_date,
status,
CAST(total_amount AS NUMERIC(12,2)) AS amount,
COALESCE(discount_code, 'NONE') AS promo_code
FROM {{ source('raw', 'orders') }}
WHERE order_date >= '2020-01-01'
# models/intermediate/int_customer_orders.sql
# Join + aggregate intermediate layer
SELECT
c.customer_id,
c.customer_name,
c.segment,
COUNT(DISTINCT o.order_id) AS order_count,
SUM(o.amount) AS lifetime_value,
MAX(o.order_date) AS last_order_date
FROM {{ ref('stg_customers') }} c
LEFT JOIN {{ ref('stg_orders') }} o ON c.customer_id = o.customer_id
GROUP BY 1, 2, 3
# models/marts/customer_analytics.sql
# Business-facing mart layer
SELECT
customer_id,
customer_name,
segment,
lifetime_value,
CASE
WHEN lifetime_value > 10000 THEN 'Platinum'
WHEN lifetime_value > 5000 THEN 'Gold'
WHEN lifetime_value > 1000 THEN 'Silver'
ELSE 'Bronze'
END AS loyalty_tier
FROM {{ ref('int_customer_orders') }}| Pattern | Structure | Pros | Cons |
|---|---|---|---|
| Star | 1 fact + N dimensions | Simple queries, fast joins | Redundancy in dims |
| Snowflake | Normalized dimensions | Less storage, cleaner | More joins, slower queries |
| Galaxy | Multiple star schemas | Covers many processes | Complex to maintain |
| One Big Table | Denormalized flat table | Fastest reads, simplest | Large storage, update anomalies |
| Data Vault | Hub + Link + Satellite | Audit-friendly, flexible | Complex, verbose queries |
import boto3
s3 = boto3.client('s3')
# ─── Data Lake Zone Pattern ───
# s3://my-lake/
# ├── raw/ # Unchanged source data (bronze)
# ├── curated/ # Cleaned, deduplicated (silver)
# ├── analytics/ # Aggregated, business-ready (gold)
# └── sandbox/ # Experimental / ad-hoc queries
# Write partitions efficiently
# s3://lake/curated/sales/year=2024/month=01/day=15/
# Use hive-style partitioning: key=value directoriesfrom delta import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('DeltaLake') \
.config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
.config('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
.getOrCreate()
# Create Delta table
df.write.format('delta').save('/delta/events')
# Upsert (merge)
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, '/delta/events')
deltaTable.alias('target').merge(
updates_df.alias('source'),
'target.id = source.id'
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time travel — query data at a specific version
spark.read.format('delta') \
.option('versionAsOf', 5) \
.load('/delta/events')
# Vacuum old versions (reclaim storage)
deltaTable.vacuum(168) # retain 7 days of history# Apache Iceberg — open table format
spark.conf.set('spark.sql.catalog.local', 'org.apache.iceberg.spark.SparkCatalog')
spark.conf.set('spark.sql.catalog.local.warehouse', 's3://my-warehouse/')
# Create Iceberg table
spark.sql('''
CREATE TABLE local.db.events (
id BIGINT,
event_time TIMESTAMP,
event_type STRING,
value DOUBLE
)
USING iceberg
PARTITIONED BY (days(event_time))
TBLPROPERTIES ('write.format.default' = 'parquet')
''')
# Write
spark.sql('INSERT INTO local.db.events SELECT * FROM new_events')
# Time travel
spark.read.format('iceberg') \
.option('snapshot-id', 123456789) \
.load('local.db.events')
# Schema evolution (no rewrite needed!)
spark.sql('ALTER TABLE local.db.events ADD COLUMN metadata MAP<STRING, STRING>')| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| Creator | Databricks | Netflix (Apache) | Uber (Apache) |
| Time travel | Yes (versions) | Yes (snapshots) | Yes (timeline) |
| Schema evolution | Add/rename cols | Full (add, drop, rename, reorder) | Full evolution |
| Partition evolution | No | Yes (hidden partitioning) | Limited |
| Multi-engine | Spark-only (OSS) | Spark, Flink, Trino, Presto | Spark, Flink |
| ACID transactions | Yes | Yes | Yes |
| Upsert / Merge | Yes (optimized) | Yes | Yes (optimized) |
| Row-level deletes | Yes | Yes (position or equality) | Yes (optimized) |
| Branch / Tag | No | Yes | No |
| Best with | Databricks platform | Multi-engine / open lakehouse | Streaming upsert workloads |
from kafka import KafkaProducer, KafkaConsumer
import json
# ─── Producer ───
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
acks='all', # wait for all replicas
retries=3,
linger_ms=10, # batch small messages
compression_type='gzip',
)
producer.send('events', key='user_42', value={
'event_type': 'page_view',
'timestamp': '2024-01-15T10:30:00Z',
'page': '/products/123',
})
producer.flush()
# ─── Consumer ───
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
group_id='analytics-consumer',
auto_offset_reset='earliest', # 'earliest' or 'latest'
enable_auto_commit=False, # manual commit for exactly-once
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
for message in consumer:
event = message.value
process_event(event)
consumer.commit()# ─── Kafka Concepts ───
concepts = {
'Topic': 'Logical category / stream of events',
'Partition': 'Ordered, immutable sequence within a topic',
'Offset': 'Unique index per message within a partition',
'Consumer Group': 'Group of consumers sharing load; '
'each partition assigned to one consumer',
'Broker': 'Kafka server that stores topic partitions',
'Replica': 'Copy of a partition for fault tolerance',
'Leader': 'Replica that handles reads/writes',
'ISR': 'In-Sync Replicas — fully caught up with leader',
}
# Topic design patterns
# ─── Event-type topics (recommended) ───
# orders, payments, shipments, user_events
#
# ─── Per-entity topics (for high-throughput entities) ───
# orders, user_clicks, sensor_readings
#
# ─── Compact topics (log-compacted retention) ───
# user-profile, product-catalog (latest state per key)# Apache Flink — true stream processing
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Define Kafka source
t_env.execute_sql('''
CREATE TABLE events (
event_type STRING,
user_id STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-processor',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
''')
# Windowed aggregation
result = t_env.sql_query('''
SELECT
window_start,
window_end,
event_type,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM TABLE(
HOP(TABLE events, DESCRIPTOR(event_time),
INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
)
GROUP BY window_start, window_end, event_type
''')
# Write to sink
result.execute_insert('analytics_sink')| Guarantee | Description | Trade-off |
|---|---|---|
| At-most-once | Messages may be lost | Fastest, lowest latency |
| At-least-once | Messages may be duplicated | Standard, may need dedup |
| Exactly-once | Each message processed once | Slowest, requires coordination |
enable.idempotence=true on the producer, isolation.level=read_committed on the consumer, and transactions in your processing engine.# dbt_project.yml
name: my_project
version: '1.0.0'
config-version: 2
profile: 'my_profile'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
models:
my_project:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: marts
finance:
+schema: finance_mart-- models/staging/stg_orders.sql
{{
config(
materialized='view',
tags=['daily', 'orders']
)
}}
WITH source AS (
SELECT * FROM {{ source('erp', 'raw_orders') }}
),
cleaned AS (
SELECT
order_id,
customer_id,
COALESCE(status, 'unknown') AS status,
TRY_CAST(order_date AS DATE) AS order_date,
TRY_CAST(amount AS DECIMAL(12,2)) AS amount,
-- Data quality checks
CASE
WHEN amount < 0 THEN 'error'
WHEN customer_id IS NULL THEN 'warning'
ELSE 'ok'
END AS dq_flag
FROM source
WHERE order_date >= '2020-01-01'
AND order_id IS NOT NULL
)
SELECT * FROM cleaned
-- models/marts/core/metrics.sql
{{
config(
materialized='incremental',
unique_key='metric_date',
cluster_by=['metric_date']
)
}}
SELECT
date_trunc('day', order_date) AS metric_date,
COUNT(DISTINCT order_id) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE order_date > (SELECT COALESCE(MAX(metric_date), '1900-01-01')
FROM {{ this }})
{% endif %}
GROUP BY 1# models/marts/schema.yml
version: 2
models:
- name: customer_metrics
description: "Daily customer engagement metrics"
columns:
- name: metric_date
description: "Aggregation date"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "< CURRENT_DATE"
- name: total_revenue
description: "Total revenue for the period"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
inclusive: true
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- metric_date
- customer_segment| Tool | Language | Strengths | Best For |
|---|---|---|---|
| Apache Airflow | Python | Mature, huge ecosystem | Batch ETL, enterprise |
| Dagster | Python | Software-defined assets, types | Modern data platforms |
| Prefect | Python | Simple API, async native | Python-native teams |
| Mage | Python / YAML | Notebook-like, interactive | Quick prototyping |
| dbt Cloud | SQL | Native dbt orchestration | Transform-only workflows |
| AWS Step Functions | JSON / YAML | Serverless, AWS native | AWS-centric stacks |
| Temporal | Go / Java | Durable execution | Long-running workflows |
materialized='incremental' in dbt for large tables — it only processes new/changed data instead of rebuilding the entire model, reducing costs dramatically.| Dimension | Definition | Example Check |
|---|---|---|
| Completeness | No missing required data | NOT NULL constraints, null rate < 1% |
| Accuracy | Data matches reality | Referential integrity, range checks |
| Consistency | Same data across systems | Hash comparison between source & target |
| Timeliness | Data arrives on schedule | Freshness checks, SLA monitoring |
| Uniqueness | No duplicate records | Primary key, composite unique constraints |
| Validity | Conforms to format/rules | Regex, enum, type, length checks |
| Integrity | Relationships are valid | Foreign key, orphan record detection |
# ─── Naming Conventions ───
naming = {
'tables': {
'raw': 'raw_{source}_{entity}', # raw_postgres_orders
'staging': 'stg_{source}_{entity}', # stg_postgres_orders
'intermediate': 'int_{description}', # int_order_customer_agg
'mart': 'fct_{entity} / dim_{entity}', # fct_orders, dim_customers
},
'columns': {
'primary_key': '{entity}_id', # order_id
'foreign_key': '{entity}_id', # customer_id
'boolean': 'is_{adjective}', # is_active
'timestamp': '{entity}_at / {entity}_ts', # created_at
'count': '{entity}_count', # order_count
},
'files': {
'models': 'snake_case.sql',
'macros': 'snake_case.sql',
'tests': 'snake_case.yml',
},
}# ─── Monitoring & Alerting ───
monitoring = {
'pipeline_health': [
'Success / failure rate per DAG',
'Task duration vs SLA',
'Queue time (waiting for resources)',
'Data freshness (time since last update)',
],
'data_quality': [
'Row count anomalies (sudden drops/spikes)',
'Schema changes (new/removed columns)',
'Null rate trends per column',
'Referential integrity violations',
'Distribution drift (statistical tests)',
],
'infrastructure': [
'Compute utilization (CPU, memory)',
'Storage growth rate',
'Queue depth (backlog)',
'Query performance (slow queries)',
],
'alerting_channels': [
'Slack / Teams webhook on failure',
'PagerDuty for critical SLA breaches',
'Email digest for data quality reports',
'Dashboard (Grafana / Metabase) for trends',
],
}# ─── CI/CD for Data Pipelines ───
# .github/workflows/dbt_ci.yml
# on: pull_request
# jobs:
# dbt-test:
# - checkout
# - dbt deps
# - dbt compile # syntax check
# - dbt lint # SQL best practices (sqlfluff)
# - dbt run --select staging # build staging models
# - dbt test # run all tests
# dbt-deploy:
# needs: dbt-test
# if: github.ref == 'refs/heads/main'
# - dbt run --target prod
# - dbt run-operation dbt_docs_generate
# - notify_slack("Deploy successful")
# Key principles:
# 1. Every change goes through a PR
# 2. CI runs compile + test on every PR
# 3. Merge to main triggers prod deployment
# 4. Schema migrations are backward-compatible
# 5. Breaking changes are versioned and communicateddbt docs or a data catalog. When something breaks, lineage is your fastest debugging tool.