Skip to content

DAG Factory Guide

The DAG Factory is xo-foundry's system for generating production-ready Airflow DAGs from YAML configuration files. It uses Pydantic for validation and Jinja2 for template rendering.

How It Works

YAML Config  →  Pydantic Validation  →  Jinja2 Template  →  Python DAG File
(developer)     (dag_config.py)         (templates/)        (dags/ directory)
  1. Author: Developer writes a YAML config defining the pipeline
  2. Validate: Config is parsed and validated against DAGConfig Pydantic model
  3. Build paths: S3 paths are constructed using path_builder.py based on domain, report, and load strategy
  4. Render: Jinja2 templates generate complete Python DAG files with TaskFlow API decorators
  5. Deploy: Generated .py files are placed in the Airflow DAGs directory

CLI Commands

Generate a DAG

uv run xo-foundry generate-dag \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
  --output apps/airflow/xo-pipelines/dags/

This reads the YAML config, validates it, and generates a Python DAG file in the output directory.

Validate a Config

uv run xo-foundry validate-config \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml

Validates the config without generating a DAG. Useful for CI/CD checks.

YAML Configuration

Full Annotated Example

Based on the production warbyparker-gladly-daily.yaml:

# ─── DAG Metadata ───────────────────────────────────────────
dag:
  domain: warbyparker                    # Client domain (S3 paths, DAG name)
  pipeline_name: gladly_daily            # Pipeline identifier
  pipeline_type: snowflake_load          # snowflake_load | intraday_refresh | legacy_elt
  schedule: "50 6 * * *"                 # Cron expression (6:50 AM daily)
  time_window:
    refresh_type: daily                  # daily | intraday
    lag:
      days: 1                            # Extract yesterday's data
    timezone: "America/New_York"         # Timezone for date calculations

# ─── Global Settings ────────────────────────────────────────
globals:
  snowflake:
    database: WBP_DB_DEV                 # Target Snowflake database
    schema: BRONZE                       # Target schema
    connection_id: snowflake_dev         # Airflow connection ID
    warehouse: XO_DEV_WH                # Snowflake warehouse

# ─── Sources (one per Bronze table) ─────────────────────────
sources:
  contact_timestamps:                    # Source key (used in task naming)
    source_type: gladly_api              # gladly_api | sprout_api | gsheet | s3 | gmail
    load_strategy: full_refresh          # full_refresh | incremental | historical

    extractor:                           # Source-specific extractor config
      metric_set: ContactTimestampsReport
      base_url_var: WARBYPARKER_GLADLY_BASE_URL
      email_var: WARBYPARKER_GLADLY_EMAIL
      token_var: WARBYPARKER_GLADLY_API_TOKEN

    paths:                               # S3 path configuration
      report_name: contact_timestamps
      filename_pattern: "contact_timestamps_{date}.csv"

    snowflake:                           # Snowflake loading config
      target_table: GLADLY_CONTACT_TIMESTAMPS
      load_strategy: truncate_insert     # Always truncate_insert for Bronze
      deduplication:
        strategy: composite_key
        unique_columns:
          - CONTACT_ID
          - TIMESTAMP_FIELD
          - EVENT_TYPE
          - INITIATOR_ID
          - MESSAGE_ID
          - TARGET_AGENT_ID
        use_hash: true                   # Hash unique columns for RECORD_KEY

  work_sessions:                         # Second source in same pipeline
    source_type: gladly_api
    load_strategy: full_refresh
    extractor:
      metric_set: WorkSessionsReport
      # ... (same auth vars)
    paths:
      report_name: work_sessions
      filename_pattern: "work_sessions_{date}.csv"
    snowflake:
      target_table: GLADLY_WORK_SESSIONS
      load_strategy: truncate_insert
      deduplication:
        strategy: composite_key
        unique_columns: [SESSION_ID, AGENT_ID, START_TIME]
        use_hash: true

Production configs location

Production YAML configs live in apps/airflow/xo-pipelines/dags/configs/ (source of truth). packages/xo-foundry/configs/ is for examples and templates only.

Pipeline Types

snowflake_load

Standard pipeline: Extract → Stage → COPY INTO Bronze.

dag:
  pipeline_type: snowflake_load

Generated flow: For each source, creates extract → stage → load task chain.

intraday_refresh

Multiple refreshes per day with overlapping time windows (ADR 002).

dag:
  pipeline_type: intraday_refresh
  time_window:
    refresh_type: intraday
    lag: { minutes: 30 }
    lookback: "8 hours"

legacy_elt

For MERGE operations into existing tables (OPERATIONS database migrations).

dag:
  pipeline_type: legacy_elt

Generated flow: Uses merge_tasks.py instead of snowflake_tasks.py.

S3 Path Construction

The path_builder.py module constructs S3 paths from config:

Ingest: s3://xo-ingest/{domain}/{report_name}/{date}/{filename}
Stage:  s3://xo-stage/{domain}/{report_name}/{load_strategy}/{date_or_timestamp}/{filename}

Examples:

s3://xo-ingest/warbyparker/contact_timestamps/2026-01-15/contact_timestamps_2026-01-15.csv
s3://xo-stage/warbyparker/contact_timestamps/full_refresh/2026-01-15/contact_timestamps_2026-01-15.csv

The load_strategy segment in the stage path (full_refresh, incremental, historical) organizes files by their loading pattern per ADR 001.

Time Window Configuration

Daily Mode

time_window:
  refresh_type: daily
  lag: { days: 1 }           # Yesterday
  timezone: "America/New_York"

Output: Single date string (e.g., 2026-01-14)

Daily (No Lag)

time_window:
  refresh_type: daily
  timezone: "America/New_York"

Output: Current date (e.g., 2026-01-15)

Intraday Relative

time_window:
  refresh_type: intraday
  lag: { minutes: 30 }
  lookback: "8 hours"
  timezone: "America/New_York"

Output: Start/end ISO8601 timestamps based on execution time

Intraday Absolute

time_window:
  refresh_type: intraday
  start_time: "2026-01-15T17:00:00"
  end_time: "2026-01-16T09:00:00"
  timezone: "America/New_York"

Output: Fixed start/end timestamps

LagConfig

The lag field uses a structured model:

lag: { days: 1 }          # 1 day
lag: { hours: 8 }         # 8 hours
lag: { minutes: 30 }      # 30 minutes
lag: { days: 1, hours: 6 } # 1 day and 6 hours

Omitting lag means zero lag (current day/time).

Multi-Source Pipelines

A single YAML config can define multiple sources that are extracted in the same DAG:

sources:
  contact_timestamps:
    source_type: gladly_api
    # ...

  work_sessions:
    source_type: gladly_api
    # ...

  conversation_timestamps:
    source_type: gladly_api
    # ...

  agent_durations:
    source_type: gladly_api
    # ...

Each source generates its own extract → stage → load task chain within the DAG. Task groups parallelize independent sources.

Pydantic Schema

The configuration schema lives in packages/xo-foundry/src/xo_foundry/schemas/dag_config.py.

Key Models

Model Fields
DAGConfig dag, globals, sources
DAGMetadata domain, pipeline_name, pipeline_type, schedule, time_window
TimeWindowConfig refresh_type, lag, lookback, timezone, start_time, end_time
LagConfig days, hours, minutes
GlobalConfig snowflake (database, schema, connection_id, warehouse)
SourceConfig source_type, load_strategy, extractor, paths, snowflake, json_columns
ExtractorConfig Polymorphic -- varies by source_type (Gladly, Sprout, GSheet, etc.)
DeduplicationConfig strategy, unique_columns, use_hash

Validation

Pydantic validates at parse time:

  • Required fields are present
  • Enum values are valid (pipeline_type, source_type, load_strategy)
  • Time window configurations are internally consistent
  • Extractor configs match their source_type

JSON Column Handling

For sources with JSON columns (e.g., Sprout API):

sources:
  sprout_messages:
    source_type: sprout_api
    json_columns:
      - activity_metadata

This enables JSON normalization in RECORD_HASH calculations, preventing phantom updates from key ordering differences.

Environment-Specific Config

The YAML config references environment-specific values via Airflow Variables and Connections:

globals:
  snowflake:
    database: WBP_DB_DEV           # Dev database
    connection_id: snowflake_dev   # Dev Airflow connection

sources:
  contact_timestamps:
    extractor:
      base_url_var: WARBYPARKER_GLADLY_BASE_URL  # Airflow Variable
      email_var: WARBYPARKER_GLADLY_EMAIL         # Airflow Variable
      token_var: WARBYPARKER_GLADLY_API_TOKEN     # Airflow Variable

For production, these values are configured in the production Airflow environment (different connection IDs, database names, etc.).

Component Reference

File Purpose
dag_factory/factory.py Main DAGFactory class -- orchestrates validation and rendering
dag_factory/builders/path_builder.py S3 path construction from config
dag_factory/templates/ Jinja2 templates for DAG Python files
schemas/dag_config.py Pydantic models for config validation
cli/generate_dags.py CLI entry point (xo-foundry generate-dag)