Skip to content

ADR 007: Task-Level Skip Logic for Airflow DAG Parameters

Status: Accepted Date: 2026-01-05 Context: Implementing runtime toggles for Airflow DAG operations


Problem Statement

How should we implement runtime parameter toggles in Airflow DAGs (e.g., skip_dynamic_refresh, debug_mode) that control whether certain tasks execute?

Use Case: Skip dynamic table refresh during backfill operations to avoid unnecessary warehouse compute, while still showing the task in the DAG graph.

Constraints: - Airflow TaskFlow API (3.0) - Jinja2 DAG generation templates - Need to access DAG-level parameters in task functions - DAG structure should be consistent (tasks always present in graph)


Decision

Implement skip logic at the task function level using Airflow context parameters.

Task functions check context["params"][parameter_name] and decide to skip execution, rather than: - Creating extra tasks to extract parameters - Conditionally creating tasks based on parameters - Checking parameters at multiple levels (DAG → task group → task)

Implementation Pattern

# DAG level - declare parameter with default value
@dag(params={"skip_dynamic_refresh": False})
def my_pipeline():
    # Task group level - always create the task (no parameter checking)
    @task_group
    def refresh_group(merge_results: list):
        # Always call the task - no conditional logic
        refresh_result = refresh_tables(merge_results)
        return refresh_result

    # Task function level - check context and decide to skip
    @task
    def refresh_tables(merge_results: list, **context):
        skip_refresh = context.get("params", {}).get("skip_dynamic_refresh", False)

        if skip_refresh:
            logger.info("Skipping refresh (skip_dynamic_refresh=True)")
            return {"status": "skipped", "message": "Refresh skipped via parameter"}

        # Otherwise execute normal logic
        logger.info("Executing refresh")
        # ... actual refresh code ...

Key Principles

  1. Single Source of Truth: Parameter extracted once in task function
  2. Consistent DAG Structure: Task always appears in graph (not conditionally created)
  3. Runtime Decision: Skip decision made at execution time, not parse time
  4. Clear Ownership: Task function owns skip logic, not DAG or task group

Alternatives Considered

Alternative 1: Extra Parameter Extraction Task

Pattern:

@task
def check_skip_refresh(**context):
    return context.get("params", {}).get("skip_dynamic_refresh", False)

skip_flag = check_skip_refresh()
refresh_result = refresh_group(merge_results, skip_refresh=skip_flag)

Rejected Because: - ❌ Extra task overhead (additional node in DAG graph) - ❌ Parameter extracted at multiple levels (task, then task group, then function) - ❌ More complex task dependencies - ❌ Harder to debug (logic spread across multiple locations)

Alternative 2: Conditional Task Creation

Pattern:

@task_group
def refresh_group(merge_results: list, skip_refresh: bool):
    if skip_refresh:
        @task
        def skip_dynamic_refresh():
            return {"status": "skipped"}
        return skip_dynamic_refresh()
    else:
        return refresh_tables(merge_results)

Rejected Because: - ❌ DAG structure changes based on parameter (different tasks created) - ❌ Parse-time decision (parameter must be known when DAG is parsed) - ❌ Task graph inconsistent across runs - ❌ Can't change behavior with runtime parameters

Alternative 3: Multi-Level Parameter Passing

Pattern:

# DAG level
skip_flag = check_skip_refresh()

# Task group level
@task_group
def refresh_group(merge_results: list, skip_refresh: bool):
    if skip_refresh:
        return skip_task()
    return refresh_tables(merge_results)

# Task function level
@task
def refresh_tables(merge_results: list, **context):
    # Check context again...

Rejected Because: - ❌ Parameter extraction duplicated (DAG, task group, task function) - ❌ Inconsistent (sometimes skip at task group, sometimes at task function) - ❌ Harder to maintain (logic in multiple places)


Consequences

Positive

Simpler DAG Structure: Fewer tasks (no parameter extraction tasks)

Consistent Graph: Task always appears in DAG graph regardless of parameter value

Runtime Flexibility: Can change behavior with runtime parameters (no DAG reparse needed)

Better Debugging: Skip logic centralized in one place with clear logging

Cleaner Templates: Less conditional Jinja logic in DAG generation templates

Single Source of Truth: Parameter extracted once in task function

Negative

⚠️ Task Shows as "Success" When Skipped: Task runs and returns success even when skipped (not a "skipped" state in Airflow). Mitigated by clear logging and return status.

⚠️ Must Use **context Parameter: Task functions must accept **context to access params. Standard Airflow pattern, but easy to forget.

Trade-offs Accepted

  • Task execution overhead when skipped (runs briefly to check parameter and return)
  • Acceptable: Overhead is minimal (milliseconds), cleaner code worth it

  • Skip logic inside task function rather than DAG structure

  • Acceptable: Better debugging and maintainability outweigh slightly less explicit DAG graph

Implementation Details

When to Use This Pattern

Use task-level skip logic when: - ✅ Need runtime parameter toggles (e.g., skip operations during backfills) - ✅ Want consistent DAG structure across runs - ✅ Task should show in graph but conditionally execute - ✅ Parameter affects execution, not DAG structure

Don't use when: - ❌ Task shouldn't exist at all in certain scenarios (use conditional DAG generation) - ❌ Decision is known at parse time and never changes (use Jinja conditionals)

Access Pattern for DAG Parameters

@task
def my_task(data: dict, **context):
    # Get parameter with default fallback
    param_value = context.get("params", {}).get("parameter_name", default_value)

    if param_value:
        logger.info(f"Parameter enabled: {param_value}")
        # ... conditional logic ...
    else:
        logger.info("Parameter disabled, executing normal flow")
        # ... normal logic ...

Logging Best Practices

Always log when skip logic executes:

if skip_refresh:
    logger.info("Skipping dynamic table refresh (skip_dynamic_refresh=True)")
    return {"status": "skipped", "message": "Refresh skipped via parameter"}

Return a clear status dictionary:

return {
    "status": "skipped",  # or "success", "partial"
    "message": "Human-readable explanation",
    "tables_skipped": [...],  # Optional: what was skipped
    "total_tables": len(tables)
}


Example: Sprout Pipeline Dynamic Refresh

Before (Complex):

# DAG level
@task
def check_skip_refresh(**context):
    return context.get("params", {}).get("skip_dynamic_refresh", False)

skip_refresh_flag = check_skip_refresh()

# Task group level
@task_group
def refresh_dynamic_tables_group(merge_results: list, skip_refresh: bool = False):
    if skip_refresh:
        @task
        def skip_dynamic_refresh():
            return {"status": "skipped"}
        return skip_dynamic_refresh()

    # ... normal logic ...

After (Simple):

# DAG level - just pass merge results
refresh_result = refresh_dynamic_tables_group(merge_results)

# Task group level - no parameter checking
@task_group
def refresh_dynamic_tables_group(merge_results: list):
    refresh_result = refresh_dynamic_tables(all_tables, merge_results[0])
    return refresh_result

# Task function level - owns skip logic
@task
def refresh_dynamic_tables(tables: list, merge_result: dict, **context):
    skip_refresh = context.get("params", {}).get("skip_dynamic_refresh", False)

    if skip_refresh:
        logger.info("Skipping refresh (skip_dynamic_refresh=True)")
        return {"status": "skipped", "tables_skipped": tables}

    # Execute refresh...


  • CLAUDE.md: Pattern 6 - Task-Level Skip Logic (Airflow DAG Toggles)
  • Project Archive: .claude/ongoing/archived/2026-01-05-sprout-data-pipeline/
  • Implementation: packages/xo-foundry/src/xo_foundry/tasks/merge_tasks.py:refresh_dynamic_tables()
  • Template: packages/xo-foundry/src/xo_foundry/dag_factory/templates/legacy_elt.py.j2

Lessons Learned

  1. Simplicity Wins: Extra tasks for parameter extraction add complexity without benefit
  2. Runtime Flexibility Matters: Users need to change behavior without redeploying DAGs
  3. Consistent Graph Structure: Conditional task creation makes debugging harder
  4. Centralized Logic: Skip logic in one place is easier to understand and maintain
  5. Airflow Context is Powerful: Built-in mechanism for passing runtime parameters to tasks

Future Considerations

  • Consider standardizing return format for skipped tasks across all pipelines
  • Document pattern in xo-foundry developer guide
  • Create helper function for parameter extraction with logging
  • Apply pattern to other toggles (debug mode, dry run, etc.)