DAG Factory Guide¶
The DAG Factory is xo-foundry's system for generating production-ready Airflow DAGs from YAML configuration files. It uses Pydantic for validation and Jinja2 for template rendering.
How It Works¶
YAML Config → Pydantic Validation → Jinja2 Template → Python DAG File
(developer) (dag_config.py) (templates/) (dags/ directory)
- Author: Developer writes a YAML config defining the pipeline
- Validate: Config is parsed and validated against
DAGConfigPydantic model - Build paths: S3 paths are constructed using
path_builder.pybased on domain, report, and load strategy - Render: Jinja2 templates generate complete Python DAG files with TaskFlow API decorators
- Deploy: Generated
.pyfiles are placed in the Airflow DAGs directory
CLI Commands¶
Generate a DAG¶
uv run xo-foundry generate-dag \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
--output apps/airflow/xo-pipelines/dags/
This reads the YAML config, validates it, and generates a Python DAG file in the output directory.
Validate a Config¶
uv run xo-foundry validate-config \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml
Validates the config without generating a DAG. Useful for CI/CD checks.
YAML Configuration¶
Full Annotated Example¶
Based on the production warbyparker-gladly-daily.yaml:
# ─── DAG Metadata ───────────────────────────────────────────
dag:
domain: warbyparker # Client domain (S3 paths, DAG name)
pipeline_name: gladly_daily # Pipeline identifier
pipeline_type: snowflake_load # snowflake_load | intraday_refresh | legacy_elt
schedule: "50 6 * * *" # Cron expression (6:50 AM daily)
time_window:
refresh_type: daily # daily | intraday
lag:
days: 1 # Extract yesterday's data
timezone: "America/New_York" # Timezone for date calculations
# ─── Global Settings ────────────────────────────────────────
globals:
snowflake:
database: WBP_DB_DEV # Target Snowflake database
schema: BRONZE # Target schema
connection_id: snowflake_dev # Airflow connection ID
warehouse: XO_DEV_WH # Snowflake warehouse
# ─── Sources (one per Bronze table) ─────────────────────────
sources:
contact_timestamps: # Source key (used in task naming)
source_type: gladly_api # gladly_api | sprout_api | gsheet | s3 | gmail
load_strategy: full_refresh # full_refresh | incremental | historical
extractor: # Source-specific extractor config
metric_set: ContactTimestampsReport
base_url_var: WARBYPARKER_GLADLY_BASE_URL
email_var: WARBYPARKER_GLADLY_EMAIL
token_var: WARBYPARKER_GLADLY_API_TOKEN
paths: # S3 path configuration
report_name: contact_timestamps
filename_pattern: "contact_timestamps_{date}.csv"
snowflake: # Snowflake loading config
target_table: GLADLY_CONTACT_TIMESTAMPS
load_strategy: truncate_insert # Always truncate_insert for Bronze
deduplication:
strategy: composite_key
unique_columns:
- CONTACT_ID
- TIMESTAMP_FIELD
- EVENT_TYPE
- INITIATOR_ID
- MESSAGE_ID
- TARGET_AGENT_ID
use_hash: true # Hash unique columns for RECORD_KEY
work_sessions: # Second source in same pipeline
source_type: gladly_api
load_strategy: full_refresh
extractor:
metric_set: WorkSessionsReport
# ... (same auth vars)
paths:
report_name: work_sessions
filename_pattern: "work_sessions_{date}.csv"
snowflake:
target_table: GLADLY_WORK_SESSIONS
load_strategy: truncate_insert
deduplication:
strategy: composite_key
unique_columns: [SESSION_ID, AGENT_ID, START_TIME]
use_hash: true
Production configs location
Production YAML configs live in apps/airflow/xo-pipelines/dags/configs/ (source of truth). packages/xo-foundry/configs/ is for examples and templates only.
Pipeline Types¶
snowflake_load¶
Standard pipeline: Extract → Stage → COPY INTO Bronze.
Generated flow: For each source, creates extract → stage → load task chain.
intraday_refresh¶
Multiple refreshes per day with overlapping time windows (ADR 002).
dag:
pipeline_type: intraday_refresh
time_window:
refresh_type: intraday
lag: { minutes: 30 }
lookback: "8 hours"
legacy_elt¶
For MERGE operations into existing tables (OPERATIONS database migrations).
Generated flow: Uses merge_tasks.py instead of snowflake_tasks.py.
S3 Path Construction¶
The path_builder.py module constructs S3 paths from config:
Ingest: s3://xo-ingest/{domain}/{report_name}/{date}/{filename}
Stage: s3://xo-stage/{domain}/{report_name}/{load_strategy}/{date_or_timestamp}/{filename}
Examples:
s3://xo-ingest/warbyparker/contact_timestamps/2026-01-15/contact_timestamps_2026-01-15.csv
s3://xo-stage/warbyparker/contact_timestamps/full_refresh/2026-01-15/contact_timestamps_2026-01-15.csv
The load_strategy segment in the stage path (full_refresh, incremental, historical) organizes files by their loading pattern per ADR 001.
Time Window Configuration¶
Daily Mode¶
Output: Single date string (e.g., 2026-01-14)
Daily (No Lag)¶
Output: Current date (e.g., 2026-01-15)
Intraday Relative¶
time_window:
refresh_type: intraday
lag: { minutes: 30 }
lookback: "8 hours"
timezone: "America/New_York"
Output: Start/end ISO8601 timestamps based on execution time
Intraday Absolute¶
time_window:
refresh_type: intraday
start_time: "2026-01-15T17:00:00"
end_time: "2026-01-16T09:00:00"
timezone: "America/New_York"
Output: Fixed start/end timestamps
LagConfig¶
The lag field uses a structured model:
lag: { days: 1 } # 1 day
lag: { hours: 8 } # 8 hours
lag: { minutes: 30 } # 30 minutes
lag: { days: 1, hours: 6 } # 1 day and 6 hours
Omitting lag means zero lag (current day/time).
Multi-Source Pipelines¶
A single YAML config can define multiple sources that are extracted in the same DAG:
sources:
contact_timestamps:
source_type: gladly_api
# ...
work_sessions:
source_type: gladly_api
# ...
conversation_timestamps:
source_type: gladly_api
# ...
agent_durations:
source_type: gladly_api
# ...
Each source generates its own extract → stage → load task chain within the DAG. Task groups parallelize independent sources.
Pydantic Schema¶
The configuration schema lives in packages/xo-foundry/src/xo_foundry/schemas/dag_config.py.
Key Models¶
| Model | Fields |
|---|---|
DAGConfig |
dag, globals, sources |
DAGMetadata |
domain, pipeline_name, pipeline_type, schedule, time_window |
TimeWindowConfig |
refresh_type, lag, lookback, timezone, start_time, end_time |
LagConfig |
days, hours, minutes |
GlobalConfig |
snowflake (database, schema, connection_id, warehouse) |
SourceConfig |
source_type, load_strategy, extractor, paths, snowflake, json_columns |
ExtractorConfig |
Polymorphic -- varies by source_type (Gladly, Sprout, GSheet, etc.) |
DeduplicationConfig |
strategy, unique_columns, use_hash |
Validation¶
Pydantic validates at parse time:
- Required fields are present
- Enum values are valid (
pipeline_type,source_type,load_strategy) - Time window configurations are internally consistent
- Extractor configs match their
source_type
JSON Column Handling¶
For sources with JSON columns (e.g., Sprout API):
This enables JSON normalization in RECORD_HASH calculations, preventing phantom updates from key ordering differences.
Environment-Specific Config¶
The YAML config references environment-specific values via Airflow Variables and Connections:
globals:
snowflake:
database: WBP_DB_DEV # Dev database
connection_id: snowflake_dev # Dev Airflow connection
sources:
contact_timestamps:
extractor:
base_url_var: WARBYPARKER_GLADLY_BASE_URL # Airflow Variable
email_var: WARBYPARKER_GLADLY_EMAIL # Airflow Variable
token_var: WARBYPARKER_GLADLY_API_TOKEN # Airflow Variable
For production, these values are configured in the production Airflow environment (different connection IDs, database names, etc.).
Component Reference¶
| File | Purpose |
|---|---|
dag_factory/factory.py |
Main DAGFactory class -- orchestrates validation and rendering |
dag_factory/builders/path_builder.py |
S3 path construction from config |
dag_factory/templates/ |
Jinja2 templates for DAG Python files |
schemas/dag_config.py |
Pydantic models for config validation |
cli/generate_dags.py |
CLI entry point (xo-foundry generate-dag) |
Related Documentation¶
- xo-foundry Overview -- Package overview and task library
- ELT Pipeline Flow -- Pipeline architecture
- Architecture Decisions -- ADRs 001, 002, 003