ADR 007: Task-Level Skip Logic for Airflow DAG Parameters¶
Status: Accepted Date: 2026-01-05 Context: Implementing runtime toggles for Airflow DAG operations
Problem Statement¶
How should we implement runtime parameter toggles in Airflow DAGs (e.g., skip_dynamic_refresh, debug_mode) that control whether certain tasks execute?
Use Case: Skip dynamic table refresh during backfill operations to avoid unnecessary warehouse compute, while still showing the task in the DAG graph.
Constraints: - Airflow TaskFlow API (3.0) - Jinja2 DAG generation templates - Need to access DAG-level parameters in task functions - DAG structure should be consistent (tasks always present in graph)
Decision¶
Implement skip logic at the task function level using Airflow context parameters.
Task functions check context["params"][parameter_name] and decide to skip execution, rather than:
- Creating extra tasks to extract parameters
- Conditionally creating tasks based on parameters
- Checking parameters at multiple levels (DAG → task group → task)
Implementation Pattern¶
# DAG level - declare parameter with default value
@dag(params={"skip_dynamic_refresh": False})
def my_pipeline():
# Task group level - always create the task (no parameter checking)
@task_group
def refresh_group(merge_results: list):
# Always call the task - no conditional logic
refresh_result = refresh_tables(merge_results)
return refresh_result
# Task function level - check context and decide to skip
@task
def refresh_tables(merge_results: list, **context):
skip_refresh = context.get("params", {}).get("skip_dynamic_refresh", False)
if skip_refresh:
logger.info("Skipping refresh (skip_dynamic_refresh=True)")
return {"status": "skipped", "message": "Refresh skipped via parameter"}
# Otherwise execute normal logic
logger.info("Executing refresh")
# ... actual refresh code ...
Key Principles¶
- Single Source of Truth: Parameter extracted once in task function
- Consistent DAG Structure: Task always appears in graph (not conditionally created)
- Runtime Decision: Skip decision made at execution time, not parse time
- Clear Ownership: Task function owns skip logic, not DAG or task group
Alternatives Considered¶
Alternative 1: Extra Parameter Extraction Task¶
Pattern:
@task
def check_skip_refresh(**context):
return context.get("params", {}).get("skip_dynamic_refresh", False)
skip_flag = check_skip_refresh()
refresh_result = refresh_group(merge_results, skip_refresh=skip_flag)
Rejected Because: - ❌ Extra task overhead (additional node in DAG graph) - ❌ Parameter extracted at multiple levels (task, then task group, then function) - ❌ More complex task dependencies - ❌ Harder to debug (logic spread across multiple locations)
Alternative 2: Conditional Task Creation¶
Pattern:
@task_group
def refresh_group(merge_results: list, skip_refresh: bool):
if skip_refresh:
@task
def skip_dynamic_refresh():
return {"status": "skipped"}
return skip_dynamic_refresh()
else:
return refresh_tables(merge_results)
Rejected Because: - ❌ DAG structure changes based on parameter (different tasks created) - ❌ Parse-time decision (parameter must be known when DAG is parsed) - ❌ Task graph inconsistent across runs - ❌ Can't change behavior with runtime parameters
Alternative 3: Multi-Level Parameter Passing¶
Pattern:
# DAG level
skip_flag = check_skip_refresh()
# Task group level
@task_group
def refresh_group(merge_results: list, skip_refresh: bool):
if skip_refresh:
return skip_task()
return refresh_tables(merge_results)
# Task function level
@task
def refresh_tables(merge_results: list, **context):
# Check context again...
Rejected Because: - ❌ Parameter extraction duplicated (DAG, task group, task function) - ❌ Inconsistent (sometimes skip at task group, sometimes at task function) - ❌ Harder to maintain (logic in multiple places)
Consequences¶
Positive¶
✅ Simpler DAG Structure: Fewer tasks (no parameter extraction tasks)
✅ Consistent Graph: Task always appears in DAG graph regardless of parameter value
✅ Runtime Flexibility: Can change behavior with runtime parameters (no DAG reparse needed)
✅ Better Debugging: Skip logic centralized in one place with clear logging
✅ Cleaner Templates: Less conditional Jinja logic in DAG generation templates
✅ Single Source of Truth: Parameter extracted once in task function
Negative¶
⚠️ Task Shows as "Success" When Skipped: Task runs and returns success even when skipped (not a "skipped" state in Airflow). Mitigated by clear logging and return status.
⚠️ Must Use **context Parameter: Task functions must accept **context to access params. Standard Airflow pattern, but easy to forget.
Trade-offs Accepted¶
- Task execution overhead when skipped (runs briefly to check parameter and return)
-
Acceptable: Overhead is minimal (milliseconds), cleaner code worth it
-
Skip logic inside task function rather than DAG structure
- Acceptable: Better debugging and maintainability outweigh slightly less explicit DAG graph
Implementation Details¶
When to Use This Pattern¶
Use task-level skip logic when: - ✅ Need runtime parameter toggles (e.g., skip operations during backfills) - ✅ Want consistent DAG structure across runs - ✅ Task should show in graph but conditionally execute - ✅ Parameter affects execution, not DAG structure
Don't use when: - ❌ Task shouldn't exist at all in certain scenarios (use conditional DAG generation) - ❌ Decision is known at parse time and never changes (use Jinja conditionals)
Access Pattern for DAG Parameters¶
@task
def my_task(data: dict, **context):
# Get parameter with default fallback
param_value = context.get("params", {}).get("parameter_name", default_value)
if param_value:
logger.info(f"Parameter enabled: {param_value}")
# ... conditional logic ...
else:
logger.info("Parameter disabled, executing normal flow")
# ... normal logic ...
Logging Best Practices¶
Always log when skip logic executes:
if skip_refresh:
logger.info("Skipping dynamic table refresh (skip_dynamic_refresh=True)")
return {"status": "skipped", "message": "Refresh skipped via parameter"}
Return a clear status dictionary:
return {
"status": "skipped", # or "success", "partial"
"message": "Human-readable explanation",
"tables_skipped": [...], # Optional: what was skipped
"total_tables": len(tables)
}
Example: Sprout Pipeline Dynamic Refresh¶
Before (Complex):
# DAG level
@task
def check_skip_refresh(**context):
return context.get("params", {}).get("skip_dynamic_refresh", False)
skip_refresh_flag = check_skip_refresh()
# Task group level
@task_group
def refresh_dynamic_tables_group(merge_results: list, skip_refresh: bool = False):
if skip_refresh:
@task
def skip_dynamic_refresh():
return {"status": "skipped"}
return skip_dynamic_refresh()
# ... normal logic ...
After (Simple):
# DAG level - just pass merge results
refresh_result = refresh_dynamic_tables_group(merge_results)
# Task group level - no parameter checking
@task_group
def refresh_dynamic_tables_group(merge_results: list):
refresh_result = refresh_dynamic_tables(all_tables, merge_results[0])
return refresh_result
# Task function level - owns skip logic
@task
def refresh_dynamic_tables(tables: list, merge_result: dict, **context):
skip_refresh = context.get("params", {}).get("skip_dynamic_refresh", False)
if skip_refresh:
logger.info("Skipping refresh (skip_dynamic_refresh=True)")
return {"status": "skipped", "tables_skipped": tables}
# Execute refresh...
Related Documentation¶
- CLAUDE.md: Pattern 6 - Task-Level Skip Logic (Airflow DAG Toggles)
- Project Archive:
.claude/ongoing/archived/2026-01-05-sprout-data-pipeline/ - Implementation:
packages/xo-foundry/src/xo_foundry/tasks/merge_tasks.py:refresh_dynamic_tables() - Template:
packages/xo-foundry/src/xo_foundry/dag_factory/templates/legacy_elt.py.j2
Lessons Learned¶
- Simplicity Wins: Extra tasks for parameter extraction add complexity without benefit
- Runtime Flexibility Matters: Users need to change behavior without redeploying DAGs
- Consistent Graph Structure: Conditional task creation makes debugging harder
- Centralized Logic: Skip logic in one place is easier to understand and maintain
- Airflow Context is Powerful: Built-in mechanism for passing runtime parameters to tasks
Future Considerations¶
- Consider standardizing return format for skipped tasks across all pipelines
- Document pattern in xo-foundry developer guide
- Create helper function for parameter extraction with logging
- Apply pattern to other toggles (debug mode, dry run, etc.)