Skip to content

DAG Factory Guide

The DAG Factory is xo-foundry's system for generating production-ready Airflow DAGs from YAML configuration files. It uses Pydantic for validation and Jinja2 for template rendering.

How It Works

YAML Config  →  Pydantic Validation  →  Jinja2 Template  →  Python DAG File
(developer)     (dag_config.py)         (templates/)        (dags/ directory)
  1. Author: Developer writes a YAML config defining the pipeline
  2. Validate: Config is parsed and validated against DAGConfig Pydantic model
  3. Build paths: S3 paths are constructed using path_builder.py based on domain, report, and load strategy
  4. Render: Jinja2 templates generate complete Python DAG files with TaskFlow API decorators
  5. Deploy: Generated .py files are placed in the Airflow DAGs directory

CLI Commands

Generate a DAG

uv run xo-foundry generate-dag \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
  --output apps/airflow/xo-pipelines/dags/

This reads the YAML config, validates it, and generates a Python DAG file in the output directory.

Validate a Config

uv run xo-foundry validate-config \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml

Validates the config without generating a DAG. Useful for CI/CD checks.

YAML Configuration

Full Annotated Example

Based on the production warbyparker-gladly-daily.yaml:

# ─── DAG Metadata ───────────────────────────────────────────
dag:
  domain: warbyparker                    # Client domain (S3 paths, DAG name)
  pipeline_name: gladly_daily            # Pipeline identifier
  pipeline_type: snowflake_load          # snowflake_load | intraday_refresh | legacy_elt
  schedule: "50 6 * * *"                 # Cron expression (6:50 AM daily)
  time_window:
    refresh_type: daily                  # daily | intraday
    lag:
      days: 1                            # Extract yesterday's data
    timezone: "America/New_York"         # Timezone for date calculations

# ─── Global Settings ────────────────────────────────────────
globals:
  snowflake:
    database: WBP_DB_DEV                 # Target Snowflake database
    schema: BRONZE                       # Target schema
    connection_id: snowflake_dev         # Airflow connection ID
    warehouse: XO_DEV_WH                # Snowflake warehouse

# ─── Sources (one per Bronze table) ─────────────────────────
sources:
  contact_timestamps:                    # Source key (used in task naming)
    source_type: gladly_api              # gladly_api | sprout_api | gsheet | s3 | gmail
    load_strategy: full_refresh          # full_refresh | incremental | historical

    extractor:                           # Source-specific extractor config
      metric_set: ContactTimestampsReport
      base_url_var: WARBYPARKER_GLADLY_BASE_URL
      email_var: WARBYPARKER_GLADLY_EMAIL
      token_var: WARBYPARKER_GLADLY_API_TOKEN

    paths:                               # S3 path configuration
      report_name: contact_timestamps
      filename_pattern: "contact_timestamps_{date}.csv"

    snowflake:                           # Snowflake loading config
      target_table: GLADLY_CONTACT_TIMESTAMPS
      load_strategy: batch_replace       # Required — no default. batch_replace for new pipelines.
      deduplication:
        strategy: composite_key
        unique_columns:
          - CONTACT_ID
          - TIMESTAMP_FIELD
          - EVENT_TYPE
          - INITIATOR_ID
          - MESSAGE_ID
          - TARGET_AGENT_ID
        use_hash: true                   # Hash unique columns for RECORD_KEY

  work_sessions:                         # Second source in same pipeline
    source_type: gladly_api
    load_strategy: full_refresh
    extractor:
      metric_set: WorkSessionsReport
      # ... (same auth vars)
    paths:
      report_name: work_sessions
      filename_pattern: "work_sessions_{date}.csv"
    snowflake:
      target_table: GLADLY_WORK_SESSIONS
      load_strategy: batch_replace
      deduplication:
        strategy: composite_key
        unique_columns: [SESSION_ID, AGENT_ID, START_TIME]
        use_hash: true

Production configs location

Production YAML configs live in apps/airflow/xo-pipelines/dags/configs/ (source of truth). packages/xo-foundry/configs/ is for examples and templates only.

Pipeline Patterns

There are three pipeline workflow patterns. Choose based on what you need to do.

Pattern 1 — Legacy ELT (legacy_elt)

For OPERATIONS database migrations only. Uses TRUNCATE TABLE + SQL MERGE in Airflow tasks (no dbt). Not used for new pipelines.

dag:
  pipeline_type: legacy_elt

Snowflake load strategy: truncate_insert (only valid use of this strategy).


Pattern 2 — Load-Only or Load + Triggered dbt (snowflake_load)

Extract → Stage → Bronze load. dbt either disabled entirely (load-only phase) or runs in a separately-triggered dbt DAG.

Use for: simple single-table loads, pipelines where dbt models don't exist yet, or pipelines where Bronze and dbt need independent retry/scale.

dag:
  pipeline_type: snowflake_load

globals:
  dbt:
    enabled: false  # load-only; or configure trigger_dag for a separate dbt DAG

Snowflake load strategy: batch_replace (standard) or append (pure accumulation, no pre-cleanup).


Pattern 3 — Load + dbt in Same DAG (snowflake_load, most common)

Extract → Stage → Bronze load → dbt Cosmos DbtTaskGroup in the same DAG. Bronze failure prevents dbt from running — they succeed or fail as a unit.

Use for: most pipelines where Bronze and Silver/Gold transformations move together.

dag:
  pipeline_type: snowflake_load

globals:
  dbt:
    enabled: true
    project_dir: /usr/local/airflow/dbt/xo_medallion
    select: "tag:warbyparker_gladly"
    threads: 4

Snowflake load strategy: batch_replace.


intraday_refresh

Multiple refreshes per day with overlapping time windows (ADR 002). Similar to Pattern 3 but with intraday time window calculation.

dag:
  pipeline_type: intraday_refresh
  time_window:
    refresh_type: intraday
    lag: { minutes: 30 }
    lookback: "8 hours"

Snowflake Load Strategies

The snowflake.load_strategy field in each source controls what SQL runs before COPY INTO. Required — no default. All strategies must be declared explicitly.

Strategy SQL When to Use
batch_replace DELETE WHERE BATCH_ID + COPY INTO Standard for new snowflake_load pipelines. Preserves history, idempotent on reruns.
append COPY INTO only (no pre-cleanup) Pure accumulation — each run adds rows. For weekly snapshots or agent lists.
truncate_insert TRUNCATE + COPY INTO Legacy legacy_elt only. Do not use in new pipelines.

An unknown or missing load_strategy raises at task runtime — there is no silent default.

S3 Path Construction

The path_builder.py module constructs S3 paths from config:

Ingest: s3://xo-ingest/{domain}/{report_name}/{date}/{filename}
Stage:  s3://xo-stage/{domain}/{report_name}/{load_strategy}/{date_or_timestamp}/{filename}

Examples:

s3://xo-ingest/warbyparker/contact_timestamps/2026-01-15/contact_timestamps_2026-01-15.csv
s3://xo-stage/warbyparker/contact_timestamps/full_refresh/2026-01-15/contact_timestamps_2026-01-15.csv

The load_strategy segment in the stage path (full_refresh, incremental, historical) organizes files by their loading pattern per ADR 001.

Time Window Configuration

Daily Mode

time_window:
  refresh_type: daily
  lag: { days: 1 }           # Yesterday
  timezone: "America/New_York"

Output: Single date string (e.g., 2026-01-14)

Daily (No Lag)

time_window:
  refresh_type: daily
  timezone: "America/New_York"

Output: Current date (e.g., 2026-01-15)

Intraday Relative

time_window:
  refresh_type: intraday
  lag: { minutes: 30 }
  lookback: "8 hours"
  timezone: "America/New_York"

Output: Start/end ISO8601 timestamps based on execution time

Intraday Absolute

time_window:
  refresh_type: intraday
  start_time: "2026-01-15T17:00:00"
  end_time: "2026-01-16T09:00:00"
  timezone: "America/New_York"

Output: Fixed start/end timestamps

LagConfig

The lag field uses a structured model:

lag: { days: 1 }          # 1 day
lag: { hours: 8 }         # 8 hours
lag: { minutes: 30 }      # 30 minutes
lag: { days: 1, hours: 6 } # 1 day and 6 hours

Omitting lag means zero lag (current day/time).

Multi-Source Pipelines

A single YAML config can define multiple sources that are extracted in the same DAG:

sources:
  contact_timestamps:
    source_type: gladly_api
    # ...

  work_sessions:
    source_type: gladly_api
    # ...

  conversation_timestamps:
    source_type: gladly_api
    # ...

  agent_durations:
    source_type: gladly_api
    # ...

Each source generates its own extract → stage → load task chain within the DAG. Task groups parallelize independent sources.

Pydantic Schema

The configuration schema lives in packages/xo-foundry/src/xo_foundry/schemas/dag_config.py.

Key Models

Model Fields
DAGConfig dag, globals, sources
DAGMetadata domain, pipeline_name, pipeline_type, schedule, time_window
TimeWindowConfig refresh_type, lag, lookback, timezone, start_time, end_time
LagConfig days, hours, minutes
GlobalConfig snowflake (database, schema, connection_id, warehouse)
SourceConfig source_type, load_strategy, extractor, paths, snowflake, json_columns
ExtractorConfig Polymorphic -- varies by source_type (Gladly, Sprout, GSheet, etc.)
DeduplicationConfig strategy, unique_columns, use_hash

Validation

Pydantic validates at parse time:

  • Required fields are present
  • Enum values are valid (pipeline_type, source_type, load_strategy)
  • Time window configurations are internally consistent
  • Extractor configs match their source_type

Header Normalization Options

expand_camelcase

Set on a source when the API returns camelCase JSON field names. Without it, emailAddress becomes EMAILADDRESS — unreadable and inconsistent. With it, word boundaries are expanded before uppercasing.

sources:
  agents:
    source_type: gladly_api
    load_strategy: historical
    expand_camelcase: true  # emailAddress → EMAIL_ADDRESS, agentId → AGENT_ID

    snowflake:
      target_table: GLADLY_AGENTS
      load_strategy: append

Applied in the standardize_headers staging task. Default: false (opt-in per source).


JSON Column Handling

For sources with JSON columns (e.g., Sprout API):

sources:
  sprout_messages:
    source_type: sprout_api
    json_columns:
      - activity_metadata

This enables JSON normalization in RECORD_HASH calculations, preventing phantom updates from key ordering differences.

Environment-Specific Config

The YAML config references environment-specific values via Airflow Variables and Connections:

globals:
  snowflake:
    database: WBP_DB_DEV           # Dev database
    connection_id: snowflake_dev   # Dev Airflow connection

sources:
  contact_timestamps:
    extractor:
      base_url_var: WARBYPARKER_GLADLY_BASE_URL  # Airflow Variable
      email_var: WARBYPARKER_GLADLY_EMAIL         # Airflow Variable
      token_var: WARBYPARKER_GLADLY_API_TOKEN     # Airflow Variable

For production, these values are configured in the production Airflow environment (different connection IDs, database names, etc.).

Component Reference

File Purpose
dag_factory/factory.py Main DAGFactory class -- orchestrates validation and rendering
dag_factory/builders/path_builder.py S3 path construction from config
dag_factory/templates/ Jinja2 templates for DAG Python files
schemas/dag_config.py Pydantic models for config validation
cli/generate_dags.py CLI entry point (xo-foundry generate-dag)