Skip to content

ADR 003: DAG Generation Strategy Evaluation

Status: Proposed Date: 2025-12-10 Context: Evaluating DAG generation approaches for xo-foundry


Problem Statement

Should we continue with our custom build-time code generation approach (YAML → Python files), or adopt runtime DAG generation using dag-factory library or similar patterns?

Current State: - Custom DAG factory in xo-foundry/dag_factory/ - Jinja2 templates generate Python DAG files - CLI command: uv run xo-foundry generate-dag - Generated files committed to apps/airflow/xo-pipelines/dags/

New Considerations: - Intraday refresh patterns (2-4x daily) - Variable-driven time windows (operational flexibility) - Astronomer deployment pipeline - Multiple clients with different patterns


Current Implementation Analysis

What We Have: Build-Time Code Generation

Architecture:

YAML Config                    Generated Python DAG
────────────                   ──────────────────────
warbyparker-                   warbyparker_timestamps_
timestamps.yaml    ──────>     daily.py (committed to git)
                   CLI tool

Process: 1. Write YAML config in packages/xo-foundry/configs/ 2. Run uv run xo-foundry generate-dag --config <yaml> --output <path> 3. Generated Python file written to apps/airflow/xo-pipelines/dags/ 4. Commit generated file to git 5. Deploy to Astronomer

Implementation Details: - Factory Class: DAGFactory in dag_factory/factory.py - Templates: Jinja2 templates in dag_factory/templates/ (e.g., snowflake_load.py.j2) - Schema Validation: Pydantic models in schemas/dag_config.py - Task Library: Reusable tasks in tasks/*_tasks.py

Generated DAG Structure:

# Hardcoded config dictionaries
BASE_EXTRACTOR_CONFIG = {...}
REPORTS_CONFIG = {
    "contact_timestamps": {...},
    "conversation_timestamps": {...}
}

@dag(dag_id="warbyparker_timestamps_daily", schedule="0 7 * * 1-5")
def pipeline():
    @task_group(group_id="gladlyapi_extract")
    def extract_reports():
        # Extraction logic using xo_foundry.tasks


Alternative Approach: Runtime DAG Generation

Option A: dag-factory Library (Open Source)

What is dag-factory? - Open-source project by Astronomer - Reads YAML configs at Airflow runtime - Dynamically creates DAG objects in memory - No code generation step required

Architecture:

YAML Config                    DAG Object (runtime)
────────────                   ───────────────────
warbyparker-                   DAG object created
timestamps.yaml    ──────>     by dag-factory at
(read at runtime)  dag-factory Airflow startup

Example Usage:

# dags/dag_factory_loader.py
from dagfactory import DagFactory

dag_factory = DagFactory("/usr/local/airflow/dags/configs/*.yaml")
dag_factory.generate_dags(globals())

YAML Example:

my_dag:
  default_args:
    owner: "data-engineering"
    start_date: 2024-01-01
  schedule_interval: "0 7 * * 1-5"
  tasks:
    extract_data:
      operator: airflow.operators.python.PythonOperator
      python_callable: my_module.extract_function

Pros: - ✅ No code generation step - ✅ Changes to YAML = immediate effect (after scheduler refresh) - ✅ Mature open-source project with community support - ✅ Well-documented - ✅ Reduced git churn (only YAML changes)

Cons: - ❌ Limited to built-in operators - doesn't work well with custom TaskFlow patterns - ❌ Less flexible - hard to implement complex patterns (task groups, dynamic dependencies) - ❌ YAML becomes complex - need to specify operator paths, callables, etc. - ❌ Debugging harder - no generated Python to inspect - ❌ Type safety lost - YAML not type-checked until runtime - ❌ Custom tasks harder - our xo_foundry.tasks pattern doesn't map cleanly - ❌ Learning curve - team needs to learn dag-factory DSL


Option B: Custom Runtime Generation (Inspired by dag-factory)

Build our own runtime generator tailored to xo-foundry patterns:

# dags/xo_dag_loader.py
from xo_foundry.runtime_factory import XODagFactory

factory = XODagFactory()
factory.load_configs_from_dir("/usr/local/airflow/dags/configs")
factory.generate_dags(globals())

Implementation:

# packages/xo-foundry/src/xo_foundry/runtime_factory.py
class XODagFactory:
    def generate_dags(self, globals_dict):
        for config in self.configs:
            dag_callable = self._create_dag_function(config)
            dag_obj = dag_callable()
            globals_dict[config.dag_id] = dag_obj

    def _create_dag_function(self, config):
        @dag(dag_id=config.dag_id, schedule=config.schedule)
        def dynamic_dag():
            # Build task graph from config
            for source in config.sources:
                extract = extract_task(source)
                load = load_task(extract, source)
        return dynamic_dag

Pros: - ✅ No code generation step - ✅ Full control over patterns - ✅ Works with our custom TaskFlow patterns - ✅ Type safety with Pydantic - ✅ Easier debugging than dag-factory

Cons: - ❌ Custom code to maintain - we own the runtime generator - ❌ More complex than build-time generation - ❌ Debugging complexity - DAG structure not visible in files - ❌ Performance overhead - DAG construction on every scheduler loop - ❌ Harder to test - need to instantiate DAGs to see structure


Comparison Matrix

Aspect Current (Build-Time) dag-factory (Runtime) Custom Runtime
Code Generation ✅ CLI tool ❌ Not needed ❌ Not needed
Git Diffs ❌ Large Python diffs ✅ Only YAML changes ✅ Only YAML changes
Type Safety ✅ Pydantic + mypy ⚠️ Runtime only ✅ Pydantic
Debugging ✅ Inspect Python file ❌ Runtime only ⚠️ Runtime inspection
Flexibility ✅ Full control ❌ Limited operators ✅ Full control
Custom Tasks ✅ Native support ❌ Complex ✅ Native support
Performance ✅ Static files ⚠️ Parse YAML each run ⚠️ Build DAG each run
Learning Curve ⚠️ Need to learn templates ⚠️ Learn dag-factory ⚠️ Learn our patterns
Operational Changes ❌ Requires deployment ✅ YAML + scheduler refresh ✅ YAML + scheduler refresh
Intraday Patterns ⚠️ Multiple DAG files ✅ Easy to add schedules ✅ Easy to add schedules
Variable-Driven Windows ✅ Can add to tasks ✅ Natural fit ✅ Natural fit
Testability ✅ Import and test ⚠️ Need runtime env ⚠️ Need runtime env
Community Support N/A ✅ Open source project ❌ We maintain

Hybrid Approach: Best of Both Worlds

Recommendation: Enhanced Build-Time + Runtime Variables

Keep build-time code generation but enhance with runtime flexibility:

Structure:

# Generated DAG (build-time)
@dag(dag_id="condenast_sprout_morning", schedule="0 8 * * 1-5")
def morning_refresh():

    @task
    def get_time_window(**context):
        # Runtime variable lookup
        config = Variable.get("sprout_refresh_config", deserialize_json=True)

        logical_date = context["logical_date"]
        return calculate_window(
            logical_date,
            config["morning"]["lookback_hours"],
            config["morning"]["buffer_minutes"]
        )

    window = get_time_window()
    extract = extract_sprout_messages(window)
    load = load_to_snowflake(extract)

Airflow Variable:

{
  "sprout_refresh_config": {
    "morning": {
      "lookback_hours": 15.5,
      "buffer_minutes": 30
    },
    "evening": {
      "lookback_hours": 9.5,
      "buffer_minutes": 30
    }
  }
}

Benefits: - ✅ Operational flexibility: Change time windows without redeployment - ✅ Type safety: Generated code is type-checked - ✅ Debuggable: Can read Python DAG files - ✅ Performant: Static DAG structure, dynamic parameters only - ✅ Best practices: Aligns with industry patterns (Fivetran, Stitch)


Decision: Keep Build-Time Generation + Add Runtime Variables

Rationale

  1. Type Safety & Quality
  2. Generated Python code passes mypy validation
  3. Catch errors at build time, not runtime
  4. Clear inspection of DAG structure

  5. Debugging & Observability

  6. Can read generated Python to understand flow
  7. Stack traces reference actual Python files
  8. Easier to troubleshoot in production

  9. Performance

  10. Airflow scheduler parses Python files once
  11. No YAML parsing overhead on every scheduler loop
  12. Static DAG structure = faster parsing

  13. Flexibility Where Needed

  14. Time windows via Airflow Variables (operational changes)
  15. DAG structure via code generation (engineering changes)
  16. Clear separation of concerns

  17. Testing & CI/CD

  18. Can unit test generated DAG files
  19. Generated code reviewed in PRs
  20. Type checking in CI pipeline

  21. Custom Patterns

  22. Full support for xo-foundry task patterns
  23. TaskFlow API @task decorators work naturally
  24. Complex task groups and dependencies

Implementation Plan

Phase 1: Enhance Current Generator (Week 1-2)

1.1 Add Intraday Refresh Template

Create: dag_factory/templates/intraday_refresh.py.j2
- Support multiple schedules per source
- Variable-driven time windows
- Proper overlap handling

1.2 Enhance YAML Schema

# New schema fields
dag:
  refresh_pattern: intraday  # or daily, hourly
  refreshes:
    - name: morning
      schedule: "0 8 * * 1-5"
      time_window:
        lookback_hours: 15.5
        buffer_minutes: 30
    - name: evening
      schedule: "0 17 * * 1-5"
      time_window:
        lookback_hours: 9.5
        buffer_minutes: 30

1.3 Generate Variable-Aware Tasks

# Generated code includes Variable lookups
@task
def get_time_window(refresh_name: str, **context):
    config = Variable.get(f"{DAG_ID}_config", deserialize_json=True)
    refresh_config = config["refreshes"][refresh_name]
    # Calculate window using config

Phase 2: Add Runtime Configuration (Week 3)

2.1 Variable Initialization

# Generated DAG includes variable setup code
DEFAULT_CONFIG = {
    "refreshes": {
        "morning": {"lookback_hours": 15.5, "buffer_minutes": 30},
        "evening": {"lookback_hours": 9.5, "buffer_minutes": 30}
    }
}

# Initialize variable if not exists (runs once)
try:
    Variable.get(f"{DAG_ID}_config")
except KeyError:
    Variable.set(f"{DAG_ID}_config", DEFAULT_CONFIG, serialize_json=True)

2.2 Variable Update UI Docs

Document how to update configs via:
1. Airflow UI: Admin → Variables
2. CLI: airflow variables set
3. API: POST /variables

Phase 3: Sprout Intraday Implementation (Week 4)

3.1 Create Sprout YAML Config

# condenast-sprout-intraday.yaml
dag:
  domain: condenast
  pipeline_name: sprout_intraday
  refresh_pattern: intraday
  refreshes:
    - morning
    - evening

3.2 Generate Two DAGs

condenast_sprout_morning_refresh.py
condenast_sprout_evening_refresh.py

3.3 Test & Deploy - Local Astronomer testing - Staging deployment - Production rollout

Phase 4: Documentation & Training (Week 5)

4.1 Update CLAUDE.md - Document intraday patterns - Variable-driven configuration - Time window calculation

4.2 Create Runbook - How to add new intraday refresh - How to modify time windows - How to troubleshoot overlaps

4.3 Team Training - Workshop on new patterns - Operational procedures - Monitoring & alerting


Future Considerations

When to Reconsider Runtime Generation

Trigger Points: - 10+ refresh patterns per client (excessive DAG files) - Frequent operational changes to schedules (daily adjustments) - Need for self-service DAG creation (non-engineers)

At That Point: - Evaluate dag-factory maturity - Consider custom runtime generator - Cost/benefit of maintaining generated files

Gradual Migration Path

If we later adopt runtime generation:

  1. Phase 1: Add runtime loader alongside generated DAGs
  2. Phase 2: Migrate low-complexity DAGs to runtime
  3. Phase 3: Deprecate templates for simple patterns
  4. Phase 4: Keep templates for complex patterns only

Key: Our task library (xo_foundry.tasks) remains reusable regardless of generation strategy.


Consequences

Positive

  • ✅ Type safety and quality maintained
  • ✅ Operational flexibility for time windows
  • ✅ Clear debugging and inspection
  • ✅ Incremental enhancement of current system
  • ✅ No major architectural disruption

Negative

  • ❌ Generated Python files still in git (but less churn with variables)
  • ❌ Two-step process: generate + configure variables
  • ❌ More complex than pure runtime generation

Neutral

  • ⚪ Learning curve for variable-driven configs
  • ⚪ Need clear docs on when to regenerate vs change variables

Alignment with Claude Chat Recommendations

The hybrid approach incorporates these best practices:

Recommendation Implementation
Configuration-driven flexibility ✅ Airflow Variables for time windows
TaskFlow API ✅ Already using @dag, @task decorators
Time window calculation pattern ✅ @task with Variable lookup
Idempotent S3 partitioning ✅ Already implemented in path_builder
Snowflake deduplication ✅ MERGE with ROW_NUMBER (already have)
Variable-driven intervals ✅ Via Airflow Variables
SLA monitoring 🔄 Add to generated DAGs
Avoid hardcoding time windows ✅ Variables prevent hardcoding

References


Decision Makers

Engineering Team

  • ADR 001: Load Strategy Terminology
  • ADR 002: Intraday Refresh Patterns
  • Future: ADR on self-service DAG creation (if needed)