Skip to content

ADR 002: Intraday Refresh Patterns

Status: Proposed Date: 2025-12-09 Context: Condé Nast Sprout Social pipeline design


Problem Statement

How should we handle multiple daily refreshes (intraday patterns) for APIs that return updated records within a time window?

Specific Requirements: - Condé Nast needs 2 refreshes daily (8 AM EST and 5 PM EST) - Sprout API returns messages created/updated within time window - Need to avoid data loss during transition periods - Future clients may need 4x/day (every 6 hours) or hourly patterns


Decision

Use separate DAGs with overlapping time windows and idempotent deduplication in Snowflake.

Pattern Structure

Refresh N: (Previous end - overlap) → (Current start - buffer)

Morning DAG:  Yesterday 16:30 → Today 07:30 (15h overlap at start)
Evening DAG:  Today 07:00 → Today 16:30 (30min overlap at start)

Key Principles

  1. Overlapping Windows: 30-minute overlap between consecutive refreshes
  2. Buffers: 30-minute buffer before each run time to avoid incomplete data
  3. Separate DAGs: One DAG per refresh time (not branching within single DAG)
  4. Idempotent Loads: Snowflake MERGE on unique key handles duplicates

Options Considered

Option 1: Separate DAGs with Overlapping Windows ✅ SELECTED

Implementation:

# condenast_sprout_morning_refresh_dag.py
@dag(schedule="0 8 * * 1-5", catchup=False)
def condenast_sprout_morning_refresh():
    """Morning: Yesterday 4:30 PM → Today 7:30 AM"""

# condenast_sprout_evening_refresh_dag.py
@dag(schedule="0 17 * * 1-5", catchup=False)
def condenast_sprout_evening_refresh():
    """Evening: Today 7:00 AM → Today 4:30 PM"""

Pros: - ✅ Clear separation - easy to monitor each refresh independently - ✅ Simple to understand - explicit time windows - ✅ Easy to modify one refresh without affecting the other - ✅ No complex branching logic - ✅ Independent SLAs and alerting per refresh - ✅ Scales to N refreshes (just add more DAGs)

Cons: - ❌ Code duplication (minimal - shared tasks in xo-foundry) - ❌ Multiple DAGs to maintain (but clear ownership)


Option 2: Single DAG with Branching

Implementation:

@dag(schedule=["0 8 * * 1-5", "0 17 * * 1-5"])
def condenast_sprout_dual_refresh():
    @task.branch
    def determine_refresh_type(**context):
        if context["logical_date"].hour == 8:
            return "morning_window"
        return "evening_window"

Pros: - ✅ Single DAG reduces proliferation

Cons: - ❌ Branching logic adds complexity - ❌ Harder to debug which refresh failed - ❌ Less clear in Airflow UI - ❌ Difficult to set different SLAs per refresh - ❌ Doesn't scale well to 4+ refreshes per day


Option 3: Single DAG with Non-Overlapping Windows

Implementation:

# Morning: Yesterday 5:00 PM → Today 8:00 AM (exact cutoff)
# Evening: Today 8:00 AM → Today 5:00 PM (exact cutoff)

Pros: - ✅ No duplicate data pulled from API

Cons: - ❌ DATA LOSS RISK: Records created/updated at 8:00:00 AM could be missed - ❌ If morning job fails, gap in data coverage - ❌ Assumes API returns perfectly consistent data at cutoff times - ❌ NOT RECOMMENDED for APIs returning "updated since" data


Option 4: Continuous Streaming (Airflow Sensors)

Implementation:

@dag(schedule_interval=None, catchup=False)
def condenast_sprout_streaming():
    wait = TimeDeltaSensor(delta=timedelta(minutes=30))
    extract = extract_sprout_messages()
    wait >> extract

Pros: - ✅ Most real-time data

Cons: - ❌ Overkill for this use case - ❌ Complex sensor management - ❌ Higher infrastructure costs - ❌ API rate limiting concerns - ❌ Harder to reason about data completeness


Rationale for Decision

Why Separate DAGs?

  1. Operational Clarity
  2. Each refresh appears as distinct pipeline in Airflow UI
  3. Easy to see "morning refresh succeeded, evening failed"
  4. Clear ownership and responsibility

  5. Independent Monitoring

  6. Set different SLAs: Morning must complete by 9 AM, Evening by 6 PM
  7. Separate alerting: Page for morning failures, email for evening
  8. Track success rates independently

  9. Flexible Evolution

  10. Easy to add 3rd refresh (e.g., noon) without changing existing DAGs
  11. Can disable one refresh without affecting others
  12. Simple to modify buffer/overlap per refresh

  13. Testing & Backfills

  14. Can backfill morning refreshes independently
  15. Test evening refresh logic without running morning
  16. Easier to reason about state

Why Overlapping Windows?

  1. Data Completeness Guarantee
  2. API may have eventual consistency
  3. Records updated at 8:00:00 AM could appear in either query
  4. 30-minute overlap catches edge cases

  5. Failure Resilience

  6. If morning job fails, evening job still captures some overlap period
  7. Reduces urgency to immediately fix morning failures

  8. API Quirks

  9. Timezone handling inconsistencies
  10. Rounding differences in timestamp comparisons
  11. Network timing variations

Idempotent Loading is Key

-- Snowflake MERGE handles duplicates gracefully
MERGE INTO bronze.sprout_messages AS target
USING staging.sprout_messages AS source
ON target.guid = source.guid
   AND target.updated_at = source.updated_at
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

This makes overlapping windows safe and efficient.


Scaling Patterns

2 Refreshes/Day (Condé Nast)

Morning:  Yesterday 16:30 → Today 07:30 (15h)
Evening:  Today 07:00 → Today 16:30 (9.5h)
Overlap:  30 minutes

4 Refreshes/Day (Every 6 Hours)

Refresh 1 (06:00): Yesterday 23:30 → Today 05:30 (6h)
Refresh 2 (12:00): Today 05:30 → Today 11:30 (6h)
Refresh 3 (18:00): Today 11:30 → Today 17:30 (6h)
Refresh 4 (00:00): Today 17:30 → Today 23:30 (6h)
Overlap:  30 minutes each

24 Refreshes/Day (Hourly)

Hour N (XX:00): Previous XX:30 → Current XX-1:30 (1h)
Overlap:  30 minutes each

Pattern: Always use (N-1 end time - buffer) to (N start time - buffer)


Implementation Guidelines

1. Time Window Calculation

@task
def get_morning_time_window(**context):
    """Calculate morning refresh time window."""
    import pendulum

    BUFFER_MINUTES = 30
    logical_date = context["logical_date"].in_timezone("America/New_York")

    # Handle Monday (skip weekend)
    if logical_date.day_of_week == 1:
        start_time = logical_date.subtract(days=3).replace(hour=16, minute=30)
    else:
        start_time = logical_date.subtract(days=1).replace(hour=16, minute=30)

    # End 30 min before this run
    end_time = logical_date.replace(hour=7, minute=30)

    return {
        "start_date": start_time.to_iso8601_string(),
        "end_date": end_time.to_iso8601_string()
    }

2. Edge Case Handling

# Holidays: Follow business calendar
if is_holiday(logical_date):
    # Look back to last business day
    last_biz_day = get_previous_business_day(logical_date)
    start_time = last_biz_day.replace(hour=16, minute=30)

# Daylight Saving Time: Use timezone-aware dates
# Pendulum handles DST transitions automatically

3. Data Quality Validation

@task
def validate_time_window(time_window):
    """Ensure time window is reasonable."""
    start = pendulum.parse(time_window["start_date"])
    end = pendulum.parse(time_window["end_date"])

    duration_hours = (end - start).in_hours()

    # Morning should be ~15 hours, evening ~9 hours
    if not (6 <= duration_hours <= 20):
        raise ValueError(f"Unexpected duration: {duration_hours}h")

    return time_window

4. Snowflake Deduplication

# In YAML config
snowflake:
  target_table: SPROUT_MESSAGES
  deduplication:
    strategy: multi_field
    unique_columns: [GUID, UPDATED_AT]
    order_by: [UPDATED_AT DESC]

Real-World Examples

Example 1: Warby Parker Gladly (Current Implementation)

Pattern: Daily batch at 7 AM EST

@dag(schedule="0 7 * * 1-5")
def warbyparker_daily_operations():
    # Pull full previous day: yesterday 00:00 to 23:59

Why: Gladly data is relatively stable after day ends.

Example 2: Fivetran Stripe Connector

Pattern: Hourly incremental with 5-minute buffer

# Conceptual - Fivetran's actual implementation
schedule="0 * * * *"  # Every hour
start = last_sync_time
end = current_time - 5 minutes

Why: Financial data needs to be near real-time.

Example 3: Stitch Salesforce Connector

Pattern: Every 30 minutes with 1-minute overlap

schedule="*/30 * * * *"
start = last_sync_time - 1 minute
end = current_time

Why: CRM updates happen continuously throughout day.


Consequences

Positive

  • ✅ Clear operational model for intraday refreshes
  • ✅ Scalable to any N refreshes per day
  • ✅ Resilient to edge cases and failures
  • ✅ Easy to monitor and debug
  • ✅ Future-proof for new clients with different patterns

Negative

  • ❌ More DAG files to manage (but shared task code)
  • ❌ Slight API inefficiency (pulling duplicate data in overlap)
  • ❌ Requires Snowflake MERGE capability (already have)

Neutral

  • ⚪ Requires clear documentation of time window logic
  • ⚪ Need to educate team on overlap rationale

Alternatives Not Considered

  • Event-driven triggers: Sprout API doesn't support webhooks
  • Change Data Capture (CDC): Not available for SaaS APIs
  • Lambda/Cloud Functions: Want orchestration in Airflow for consistency

References


Decision Maker

Engineering Team

  • ADR 001: Load Strategy Terminology (defines full_refresh vs incremental)
  • Future: ADR on SLA monitoring for intraday pipelines