Skip to content

ADR 005: Configurable Snowflake Stage Name

Status

Accepted

Context

The Snowflake COPY INTO operation requires a stage to reference S3 files. Different databases and schemas may have different stage locations:

  • CORE_DB_DEV.BRONZE.XO_S3_STAGE - Development client databases
  • CORE_DB_PROD.BRONZE.XO_S3_STAGE - Production client databases
  • OPERATIONS.DEV_STAGING.XO_S3_STAGE - Operations database (Condé Nast Sprout)
  • Custom stages for specific use cases

The original implementation hardcoded stage name detection based on database name patterns (_DEV, _PROD), which didn't work for databases like OPERATIONS that don't follow this convention.

Decision

Three-Tier Priority System

Stage name resolution follows this priority:

  1. YAML Configuration (highest priority)

    globals:
      snowflake:
        stage_name: OPERATIONS.DEV_STAGING.XO_S3_STAGE
    

  2. Airflow Variable (medium priority)

    stage_name = Variable.get("SNOWFLAKE_STAGE_NAME")
    

  3. Default Based on Database (fallback)

    if "_DEV" in database:
        stage_name = "CORE_DB_DEV.BRONZE.XO_S3_STAGE"
    elif "_PROD" in database:
        stage_name = "CORE_DB_PROD.BRONZE.XO_S3_STAGE"
    else:
        stage_name = "XO_S3_STAGE"  # Unqualified fallback
    

Implementation Details

Schema Addition (dag_config.py):

class SnowflakeGlobalConfig(BaseModel):
    database: str
    schema_: str
    connection_id: str = "snowflake_default"
    warehouse: str | None = None
    stage_name: str | None = None  # NEW: Optional stage name

Task Logic (snowflake_tasks.py):

# Priority 1: Config
stage_name = snowflake_config.get("stage_name")

# Priority 2: Airflow Variable
if not stage_name:
    stage_name = Variable.get("SNOWFLAKE_STAGE_NAME", None)

# Priority 3: Default
if not stage_name:
    database = snowflake_config.get("database", "")
    if "_DEV" in database:
        stage_name = "CORE_DB_DEV.BRONZE.XO_S3_STAGE"
    elif "_PROD" in database:
        stage_name = "CORE_DB_PROD.BRONZE.XO_S3_STAGE"
    else:
        stage_name = "XO_S3_STAGE"

Template Support (both snowflake_load.py.j2 and intraday_refresh.py.j2):

BASE_SNOWFLAKE_CONFIG = {
    "database": DATABASE,
    "schema": SCHEMA,
    ...
{% if stage_name %}    "stage_name": "{{ stage_name }}",
{% endif %}}

Consequences

Positive

  • Flexibility: Each pipeline can specify its own stage
  • Clarity: Stage name is explicit in YAML config (self-documenting)
  • Backward Compatibility: Existing DAGs without stage_name still work via fallback
  • Environment Isolation: Different stages for dev/staging/prod
  • Database Independence: Works with any database naming convention

Negative

  • Configuration Burden: Users must know correct stage name for their database
  • Error Potential: Incorrect stage name causes runtime failure (not caught at config validation)
  • Duplication: Stage name appears in both YAML and Snowflake DDL

Neutral

  • Airflow Variable Still Supported: Allows global override if needed
  • Default Behavior Unchanged: Existing logic still works for _DEV/_PROD databases

Use Cases

Use Case 1: Client Databases (Existing Pattern)

# Warby Parker Development
globals:
  snowflake:
    database: WBP_DB_DEV
    # stage_name not specified, defaults to CORE_DB_DEV.BRONZE.XO_S3_STAGE

Use Case 2: Operations Database (New Pattern)

# Condé Nast Sprout (Operations database)
globals:
  snowflake:
    database: OPERATIONS
    schema: STAGING
    stage_name: OPERATIONS.DEV_STAGING.XO_S3_STAGE  # Required, no default

Use Case 3: Custom Stage

# Special use case with dedicated stage
globals:
  snowflake:
    database: ANALYTICS_DB
    schema: RAW
    stage_name: ANALYTICS_DB.RAW.CUSTOM_STAGE

Use Case 4: Global Airflow Variable

# In Airflow UI, set variable
SNOWFLAKE_STAGE_NAME = "SHARED_STAGE"
# All DAGs without explicit stage_name will use this

Implementation Timeline

  • 2025-12-11: Implemented and deployed with Condé Nast Sprout pipeline
  • Files Changed:
  • packages/xo-foundry/src/xo_foundry/schemas/dag_config.py
  • packages/xo-foundry/src/xo_foundry/dag_factory/factory.py
  • packages/xo-foundry/src/xo_foundry/tasks/snowflake_tasks.py
  • packages/xo-foundry/src/xo_foundry/dag_factory/templates/snowflake_load.py.j2
  • packages/xo-foundry/src/xo_foundry/dag_factory/templates/intraday_refresh.py.j2

Alternatives Considered

Alternative 1: Always Require Stage Name in YAML

Approach: Make stage_name required field, remove defaults

Rejected because: - Breaking change for existing DAGs - Forces users to know stage name for every pipeline - Loses convenience of smart defaults

Alternative 2: Database-to-Stage Mapping Table

Approach: Maintain mapping of database → stage in config

stage_mappings:
  WBP_DB_DEV: CORE_DB_DEV.BRONZE.XO_S3_STAGE
  OPERATIONS: OPERATIONS.DEV_STAGING.XO_S3_STAGE

Rejected because: - Central config becomes bottleneck - Doesn't support ad-hoc databases - More complex than current solution

Alternative 3: Auto-Discovery via Snowflake Query

Approach: Query Snowflake for available stages, pick first match

Rejected because: - Runtime overhead on every DAG run - Unpredictable behavior if multiple stages exist - Network dependency for configuration

Migration Guide

For Existing DAGs (No Changes Needed)

Databases following _DEV/_PROD pattern continue working:

# No change needed
globals:
  snowflake:
    database: WBP_DB_DEV  # Auto-resolves to CORE_DB_DEV.BRONZE.XO_S3_STAGE

For New OPERATIONS Pipelines

Explicitly specify stage name:

# Required for OPERATIONS database
globals:
  snowflake:
    database: OPERATIONS
    stage_name: OPERATIONS.DEV_STAGING.XO_S3_STAGE

For Custom Stages

Specify exact stage location:

globals:
  snowflake:
    stage_name: CUSTOM_DB.CUSTOM_SCHEMA.CUSTOM_STAGE

References

  • Issue: OPERATIONS.STAGING.XO_S3_STAGE does not exist (incorrect auto-detection)
  • Fix: Added stage_name to SnowflakeGlobalConfig
  • Deployment: Condé Nast Sprout production DAG (2025-12-11)