ADR 003: DAG Generation Strategy Evaluation¶
Status: Proposed Date: 2025-12-10 Context: Evaluating DAG generation approaches for xo-foundry
Problem Statement¶
Should we continue with our custom build-time code generation approach (YAML → Python files), or adopt runtime DAG generation using dag-factory library or similar patterns?
Current State:
- Custom DAG factory in xo-foundry/dag_factory/
- Jinja2 templates generate Python DAG files
- CLI command: uv run xo-foundry generate-dag
- Generated files committed to apps/airflow/xo-pipelines/dags/
New Considerations: - Intraday refresh patterns (2-4x daily) - Variable-driven time windows (operational flexibility) - Astronomer deployment pipeline - Multiple clients with different patterns
Current Implementation Analysis¶
What We Have: Build-Time Code Generation¶
Architecture:
YAML Config Generated Python DAG
──────────── ──────────────────────
warbyparker- warbyparker_timestamps_
timestamps.yaml ──────> daily.py (committed to git)
CLI tool
Process:
1. Write YAML config in packages/xo-foundry/configs/
2. Run uv run xo-foundry generate-dag --config <yaml> --output <path>
3. Generated Python file written to apps/airflow/xo-pipelines/dags/
4. Commit generated file to git
5. Deploy to Astronomer
Implementation Details:
- Factory Class: DAGFactory in dag_factory/factory.py
- Templates: Jinja2 templates in dag_factory/templates/ (e.g., snowflake_load.py.j2)
- Schema Validation: Pydantic models in schemas/dag_config.py
- Task Library: Reusable tasks in tasks/*_tasks.py
Generated DAG Structure:
# Hardcoded config dictionaries
BASE_EXTRACTOR_CONFIG = {...}
REPORTS_CONFIG = {
"contact_timestamps": {...},
"conversation_timestamps": {...}
}
@dag(dag_id="warbyparker_timestamps_daily", schedule="0 7 * * 1-5")
def pipeline():
@task_group(group_id="gladlyapi_extract")
def extract_reports():
# Extraction logic using xo_foundry.tasks
Alternative Approach: Runtime DAG Generation¶
Option A: dag-factory Library (Open Source)¶
What is dag-factory? - Open-source project by Astronomer - Reads YAML configs at Airflow runtime - Dynamically creates DAG objects in memory - No code generation step required
Architecture:
YAML Config DAG Object (runtime)
──────────── ───────────────────
warbyparker- DAG object created
timestamps.yaml ──────> by dag-factory at
(read at runtime) dag-factory Airflow startup
Example Usage:
# dags/dag_factory_loader.py
from dagfactory import DagFactory
dag_factory = DagFactory("/usr/local/airflow/dags/configs/*.yaml")
dag_factory.generate_dags(globals())
YAML Example:
my_dag:
default_args:
owner: "data-engineering"
start_date: 2024-01-01
schedule_interval: "0 7 * * 1-5"
tasks:
extract_data:
operator: airflow.operators.python.PythonOperator
python_callable: my_module.extract_function
Pros: - ✅ No code generation step - ✅ Changes to YAML = immediate effect (after scheduler refresh) - ✅ Mature open-source project with community support - ✅ Well-documented - ✅ Reduced git churn (only YAML changes)
Cons:
- ❌ Limited to built-in operators - doesn't work well with custom TaskFlow patterns
- ❌ Less flexible - hard to implement complex patterns (task groups, dynamic dependencies)
- ❌ YAML becomes complex - need to specify operator paths, callables, etc.
- ❌ Debugging harder - no generated Python to inspect
- ❌ Type safety lost - YAML not type-checked until runtime
- ❌ Custom tasks harder - our xo_foundry.tasks pattern doesn't map cleanly
- ❌ Learning curve - team needs to learn dag-factory DSL
Option B: Custom Runtime Generation (Inspired by dag-factory)¶
Build our own runtime generator tailored to xo-foundry patterns:
# dags/xo_dag_loader.py
from xo_foundry.runtime_factory import XODagFactory
factory = XODagFactory()
factory.load_configs_from_dir("/usr/local/airflow/dags/configs")
factory.generate_dags(globals())
Implementation:
# packages/xo-foundry/src/xo_foundry/runtime_factory.py
class XODagFactory:
def generate_dags(self, globals_dict):
for config in self.configs:
dag_callable = self._create_dag_function(config)
dag_obj = dag_callable()
globals_dict[config.dag_id] = dag_obj
def _create_dag_function(self, config):
@dag(dag_id=config.dag_id, schedule=config.schedule)
def dynamic_dag():
# Build task graph from config
for source in config.sources:
extract = extract_task(source)
load = load_task(extract, source)
return dynamic_dag
Pros: - ✅ No code generation step - ✅ Full control over patterns - ✅ Works with our custom TaskFlow patterns - ✅ Type safety with Pydantic - ✅ Easier debugging than dag-factory
Cons: - ❌ Custom code to maintain - we own the runtime generator - ❌ More complex than build-time generation - ❌ Debugging complexity - DAG structure not visible in files - ❌ Performance overhead - DAG construction on every scheduler loop - ❌ Harder to test - need to instantiate DAGs to see structure
Comparison Matrix¶
| Aspect | Current (Build-Time) | dag-factory (Runtime) | Custom Runtime |
|---|---|---|---|
| Code Generation | ✅ CLI tool | ❌ Not needed | ❌ Not needed |
| Git Diffs | ❌ Large Python diffs | ✅ Only YAML changes | ✅ Only YAML changes |
| Type Safety | ✅ Pydantic + mypy | ⚠️ Runtime only | ✅ Pydantic |
| Debugging | ✅ Inspect Python file | ❌ Runtime only | ⚠️ Runtime inspection |
| Flexibility | ✅ Full control | ❌ Limited operators | ✅ Full control |
| Custom Tasks | ✅ Native support | ❌ Complex | ✅ Native support |
| Performance | ✅ Static files | ⚠️ Parse YAML each run | ⚠️ Build DAG each run |
| Learning Curve | ⚠️ Need to learn templates | ⚠️ Learn dag-factory | ⚠️ Learn our patterns |
| Operational Changes | ❌ Requires deployment | ✅ YAML + scheduler refresh | ✅ YAML + scheduler refresh |
| Intraday Patterns | ⚠️ Multiple DAG files | ✅ Easy to add schedules | ✅ Easy to add schedules |
| Variable-Driven Windows | ✅ Can add to tasks | ✅ Natural fit | ✅ Natural fit |
| Testability | ✅ Import and test | ⚠️ Need runtime env | ⚠️ Need runtime env |
| Community Support | N/A | ✅ Open source project | ❌ We maintain |
Hybrid Approach: Best of Both Worlds¶
Recommendation: Enhanced Build-Time + Runtime Variables¶
Keep build-time code generation but enhance with runtime flexibility:
Structure:
# Generated DAG (build-time)
@dag(dag_id="condenast_sprout_morning", schedule="0 8 * * 1-5")
def morning_refresh():
@task
def get_time_window(**context):
# Runtime variable lookup
config = Variable.get("sprout_refresh_config", deserialize_json=True)
logical_date = context["logical_date"]
return calculate_window(
logical_date,
config["morning"]["lookback_hours"],
config["morning"]["buffer_minutes"]
)
window = get_time_window()
extract = extract_sprout_messages(window)
load = load_to_snowflake(extract)
Airflow Variable:
{
"sprout_refresh_config": {
"morning": {
"lookback_hours": 15.5,
"buffer_minutes": 30
},
"evening": {
"lookback_hours": 9.5,
"buffer_minutes": 30
}
}
}
Benefits: - ✅ Operational flexibility: Change time windows without redeployment - ✅ Type safety: Generated code is type-checked - ✅ Debuggable: Can read Python DAG files - ✅ Performant: Static DAG structure, dynamic parameters only - ✅ Best practices: Aligns with industry patterns (Fivetran, Stitch)
Decision: Keep Build-Time Generation + Add Runtime Variables¶
Rationale¶
- Type Safety & Quality
- Generated Python code passes
mypyvalidation - Catch errors at build time, not runtime
-
Clear inspection of DAG structure
-
Debugging & Observability
- Can read generated Python to understand flow
- Stack traces reference actual Python files
-
Easier to troubleshoot in production
-
Performance
- Airflow scheduler parses Python files once
- No YAML parsing overhead on every scheduler loop
-
Static DAG structure = faster parsing
-
Flexibility Where Needed
- Time windows via Airflow Variables (operational changes)
- DAG structure via code generation (engineering changes)
-
Clear separation of concerns
-
Testing & CI/CD
- Can unit test generated DAG files
- Generated code reviewed in PRs
-
Type checking in CI pipeline
-
Custom Patterns
- Full support for xo-foundry task patterns
- TaskFlow API @task decorators work naturally
- Complex task groups and dependencies
Implementation Plan¶
Phase 1: Enhance Current Generator (Week 1-2)¶
1.1 Add Intraday Refresh Template
Create: dag_factory/templates/intraday_refresh.py.j2
- Support multiple schedules per source
- Variable-driven time windows
- Proper overlap handling
1.2 Enhance YAML Schema
# New schema fields
dag:
refresh_pattern: intraday # or daily, hourly
refreshes:
- name: morning
schedule: "0 8 * * 1-5"
time_window:
lookback_hours: 15.5
buffer_minutes: 30
- name: evening
schedule: "0 17 * * 1-5"
time_window:
lookback_hours: 9.5
buffer_minutes: 30
1.3 Generate Variable-Aware Tasks
# Generated code includes Variable lookups
@task
def get_time_window(refresh_name: str, **context):
config = Variable.get(f"{DAG_ID}_config", deserialize_json=True)
refresh_config = config["refreshes"][refresh_name]
# Calculate window using config
Phase 2: Add Runtime Configuration (Week 3)¶
2.1 Variable Initialization
# Generated DAG includes variable setup code
DEFAULT_CONFIG = {
"refreshes": {
"morning": {"lookback_hours": 15.5, "buffer_minutes": 30},
"evening": {"lookback_hours": 9.5, "buffer_minutes": 30}
}
}
# Initialize variable if not exists (runs once)
try:
Variable.get(f"{DAG_ID}_config")
except KeyError:
Variable.set(f"{DAG_ID}_config", DEFAULT_CONFIG, serialize_json=True)
2.2 Variable Update UI Docs
Document how to update configs via:
1. Airflow UI: Admin → Variables
2. CLI: airflow variables set
3. API: POST /variables
Phase 3: Sprout Intraday Implementation (Week 4)¶
3.1 Create Sprout YAML Config
# condenast-sprout-intraday.yaml
dag:
domain: condenast
pipeline_name: sprout_intraday
refresh_pattern: intraday
refreshes:
- morning
- evening
3.2 Generate Two DAGs
3.3 Test & Deploy - Local Astronomer testing - Staging deployment - Production rollout
Phase 4: Documentation & Training (Week 5)¶
4.1 Update CLAUDE.md - Document intraday patterns - Variable-driven configuration - Time window calculation
4.2 Create Runbook - How to add new intraday refresh - How to modify time windows - How to troubleshoot overlaps
4.3 Team Training - Workshop on new patterns - Operational procedures - Monitoring & alerting
Future Considerations¶
When to Reconsider Runtime Generation¶
Trigger Points: - 10+ refresh patterns per client (excessive DAG files) - Frequent operational changes to schedules (daily adjustments) - Need for self-service DAG creation (non-engineers)
At That Point: - Evaluate dag-factory maturity - Consider custom runtime generator - Cost/benefit of maintaining generated files
Gradual Migration Path¶
If we later adopt runtime generation:
- Phase 1: Add runtime loader alongside generated DAGs
- Phase 2: Migrate low-complexity DAGs to runtime
- Phase 3: Deprecate templates for simple patterns
- Phase 4: Keep templates for complex patterns only
Key: Our task library (xo_foundry.tasks) remains reusable regardless of generation strategy.
Consequences¶
Positive¶
- ✅ Type safety and quality maintained
- ✅ Operational flexibility for time windows
- ✅ Clear debugging and inspection
- ✅ Incremental enhancement of current system
- ✅ No major architectural disruption
Negative¶
- ❌ Generated Python files still in git (but less churn with variables)
- ❌ Two-step process: generate + configure variables
- ❌ More complex than pure runtime generation
Neutral¶
- ⚪ Learning curve for variable-driven configs
- ⚪ Need clear docs on when to regenerate vs change variables
Alignment with Claude Chat Recommendations¶
The hybrid approach incorporates these best practices:
| Recommendation | Implementation |
|---|---|
| Configuration-driven flexibility | ✅ Airflow Variables for time windows |
| TaskFlow API | ✅ Already using @dag, @task decorators |
| Time window calculation pattern | ✅ @task with Variable lookup |
| Idempotent S3 partitioning | ✅ Already implemented in path_builder |
| Snowflake deduplication | ✅ MERGE with ROW_NUMBER (already have) |
| Variable-driven intervals | ✅ Via Airflow Variables |
| SLA monitoring | 🔄 Add to generated DAGs |
| Avoid hardcoding time windows | ✅ Variables prevent hardcoding |
References¶
- dag-factory GitHub
- Airflow Best Practices - Dynamic DAGs
- ADR 002: Intraday Refresh Patterns
- Astronomer Registry - dag-factory
Decision Makers¶
Engineering Team
Related Decisions¶
- ADR 001: Load Strategy Terminology
- ADR 002: Intraday Refresh Patterns
- Future: ADR on self-service DAG creation (if needed)