Home
cd ../playbooks
Developer ToolsAdvanced

ETL Pipeline Builder

Design and automate Extract, Transform, Load data pipelines for data integration and analytics

10 minutes
By communitySource
#etl#data-pipeline#integration#analytics#automation
CLAUDE.md Template

Download this file and place it in your project folder to get started.

# ETL Pipeline

Comprehensive workflow for designing and automating Extract, Transform, Load data pipelines.

## Pipeline Architecture

### Core ETL Flow

```
DATA PIPELINE ARCHITECTURE:
┌─────────────────────────────────────────────────────────┐
│                     EXTRACT                              │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│ Postgres│  MySQL  │ MongoDB │  APIs   │  Files (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
     │         │         │         │              │
     └─────────┴─────────┴────┬────┴──────────────┘
                              ▼
┌─────────────────────────────────────────────────────────┐
│                    TRANSFORM                             │
│  • Clean & Validate    • Aggregate & Join               │
│  • Normalize           • Calculate Metrics              │
│  • Deduplicate         • Apply Business Rules           │
└────────────────────────┬────────────────────────────────┘
                         ▼
┌─────────────────────────────────────────────────────────┐
│                      LOAD                                │
├─────────────┬─────────────┬─────────────┬───────────────┤
│  BigQuery   │  Snowflake  │  Redshift   │  Data Lake    │
└─────────────┴─────────────┴─────────────┴───────────────┘
```

## Source Connectors

### Database Connections

```yaml
sources:
  postgres:
    type: postgresql
    host: db.example.com
    port: 5432
    database: production
    ssl: true
    extraction:
      method: incremental
      key: updated_at
      batch_size: 10000

  mysql:
    type: mysql
    host: mysql.example.com
    port: 3306
    database: analytics
    extraction:
      method: cdc
      binlog: true

  mongodb:
    type: mongodb
    connection_string: mongodb+srv://...
    database: app_data
    extraction:
      method: change_streams
```

### API Sources

```yaml
api_sources:
  stripe:
    type: rest_api
    base_url: https://api.stripe.com/v1
    auth: bearer_token
    endpoints:
      - /charges
      - /customers
      - /subscriptions
    pagination: cursor
    rate_limit: 100/second

  salesforce:
    type: salesforce
    instance_url: https://company.salesforce.com
    auth: oauth2
    objects:
      - Account
      - Opportunity
      - Contact
    bulk_api: true
```

## Transformation Layer

### Common Transformations

```python
# Data Cleaning
transformations = {
    "clean_nulls": {
        "operation": "fill_null",
        "columns": ["email", "phone"],
        "value": "unknown"
    },
    
    "standardize_dates": {
        "operation": "date_parse",
        "columns": ["created_at", "updated_at"],
        "format": "ISO8601"
    },
    
    "normalize_currency": {
        "operation": "convert_currency",
        "source_column": "amount",
        "currency_column": "currency",
        "target": "USD"
    },
    
    "deduplicate": {
        "operation": "distinct",
        "key_columns": ["customer_id", "transaction_id"],
        "keep": "latest"
    }
}
```

### Aggregation Rules

```sql
-- Daily Revenue Aggregation
SELECT 
    DATE(created_at) as date,
    product_category,
    COUNT(*) as transactions,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_order_value,
    COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '${start_date}'
GROUP BY 1, 2
```

### Join Operations

```yaml
joins:
  - name: enrich_orders
    left: orders
    right: customers
    type: left
    on:
      - left: customer_id
        right: id
    select:
      - orders.*
      - customers.email
      - customers.segment
      - customers.lifetime_value

  - name: add_product_details
    left: enriched_orders
    right: products
    type: left
    on:
      - left: product_id
        right: id
```

## Load Strategies

### BigQuery Load

```yaml
bigquery_load:
  project: my-project
  dataset: analytics
  table: fact_orders
  
  schema:
    - name: order_id
      type: STRING
      mode: REQUIRED
    - name: customer_id
      type: STRING
    - name: amount
      type: NUMERIC
    - name: created_at
      type: TIMESTAMP
  
  load_config:
    write_disposition: WRITE_APPEND
    create_disposition: CREATE_IF_NEEDED
    clustering_fields: [customer_id]
    time_partitioning:
      field: created_at
      type: DAY
```

### Snowflake Load

```yaml
snowflake_load:
  warehouse: ETL_WH
  database: ANALYTICS
  schema: PUBLIC
  table: FACT_ORDERS
  
  staging:
    stage: '@MY_STAGE'
    file_format: JSON
  
  copy_options:
    on_error: CONTINUE
    purge: true
    match_by_column_name: CASE_INSENSITIVE
```

## Pipeline Orchestration

### DAG Definition

```yaml
pipeline:
  name: daily_analytics_etl
  schedule: "0 2 * * *"  # 2 AM daily
  
  tasks:
    - id: extract_orders
      type: extract
      source: postgres
      query: "SELECT * FROM orders WHERE date = '${execution_date}'"
      
    - id: extract_customers
      type: extract
      source: postgres
      query: "SELECT * FROM customers"
      
    - id: transform_data
      type: transform
      dependencies: [extract_orders, extract_customers]
      operations:
        - join_customers
        - calculate_metrics
        - apply_business_rules
      
    - id: load_warehouse
      type: load
      dependencies: [transform_data]
      target: bigquery
      table: fact_orders
      
    - id: notify_complete
      type: notification
      dependencies: [load_warehouse]
      channel: slack
      message: "Daily ETL completed successfully"
```

### Error Handling

```yaml
error_handling:
  retry:
    max_attempts: 3
    delay_seconds: 300
    exponential_backoff: true
  
  on_failure:
    - log_error
    - send_alert
    - save_failed_records
  
  dead_letter:
    enabled: true
    destination: s3://etl-errors/
    retention_days: 30
```

## Data Quality

### Validation Rules

```yaml
quality_checks:
  - name: null_check
    column: customer_id
    rule: not_null
    severity: error
    
  - name: range_check
    column: amount
    rule: between
    min: 0
    max: 100000
    severity: warning
    
  - name: uniqueness
    columns: [order_id]
    rule: unique
    severity: error
    
  - name: referential_integrity
    column: product_id
    reference_table: products
    reference_column: id
    severity: error
    
  - name: freshness
    column: updated_at
    rule: max_age_hours
    value: 24
    severity: warning
```

### Quality Metrics Dashboard

```
DATA QUALITY REPORT - ${date}
═══════════════════════════════════════
Total Records Processed: 1,250,000
Passed Validation:       1,247,500 (99.8%)
Failed Validation:           2,500 (0.2%)

ISSUES BY TYPE:
┌─────────────────┬────────┬──────────┐
│ Issue Type      │ Count  │ Severity │
├─────────────────┼────────┼──────────┤
│ Null values     │ 1,200  │ Warning  │
│ Invalid format  │   850  │ Error    │
│ Out of range    │   300  │ Warning  │
│ Duplicates      │   150  │ Error    │
└─────────────────┴────────┴──────────┘
```

## Monitoring & Alerting

### Pipeline Metrics

```yaml
metrics:
  - name: pipeline_duration
    type: gauge
    labels: [pipeline_name, status]
    
  - name: records_processed
    type: counter
    labels: [pipeline_name, source, destination]
    
  - name: error_count
    type: counter
    labels: [pipeline_name, error_type]
    
  - name: data_freshness
    type: gauge
    labels: [table_name]
```

### Alert Configuration

```yaml
alerts:
  - name: pipeline_failed
    condition: status == 'failed'
    channels: [pagerduty, slack]
    
  - name: high_error_rate
    condition: error_rate > 0.05
    channels: [slack]
    
  - name: slow_pipeline
    condition: duration > 2 * avg_duration
    channels: [slack]
    
  - name: data_freshness
    condition: freshness_hours > 24
    channels: [email]
```

## Best Practices

1. **Incremental Loading**: Use incremental extraction when possible
2. **Idempotency**: Ensure pipelines can be re-run safely
3. **Partitioning**: Partition large tables by date
4. **Monitoring**: Track pipeline health metrics
5. **Documentation**: Document all transformations
6. **Testing**: Test with sample data before production
7. **Version Control**: Track pipeline changes in git
README.md

What This Does

Comprehensive workflow for designing and automating Extract, Transform, Load data pipelines.


Quick Start

Step 1: Create a Project Folder

mkdir -p ~/Documents/EtlPipeline

Step 2: Download the Template

Click Download above, then:

mv ~/Downloads/CLAUDE.md ~/Documents/EtlPipeline/

Step 3: Start Working

cd ~/Documents/EtlPipeline
claude

Best Practices

  1. Incremental Loading: Use incremental extraction when possible
  2. Idempotency: Ensure pipelines can be re-run safely
  3. Partitioning: Partition large tables by date
  4. Monitoring: Track pipeline health metrics
  5. Documentation: Document all transformations
  6. Testing: Test with sample data before production
  7. Version Control: Track pipeline changes in git

$Related Playbooks