ADR 002: Intraday Refresh Patterns¶
Status: Proposed Date: 2025-12-09 Context: Condé Nast Sprout Social pipeline design
Problem Statement¶
How should we handle multiple daily refreshes (intraday patterns) for APIs that return updated records within a time window?
Specific Requirements: - Condé Nast needs 2 refreshes daily (8 AM EST and 5 PM EST) - Sprout API returns messages created/updated within time window - Need to avoid data loss during transition periods - Future clients may need 4x/day (every 6 hours) or hourly patterns
Decision¶
Use separate DAGs with overlapping time windows and idempotent deduplication in Snowflake.
Pattern Structure¶
Refresh N: (Previous end - overlap) → (Current start - buffer)
Morning DAG: Yesterday 16:30 → Today 07:30 (15h overlap at start)
Evening DAG: Today 07:00 → Today 16:30 (30min overlap at start)
Key Principles¶
- Overlapping Windows: 30-minute overlap between consecutive refreshes
- Buffers: 30-minute buffer before each run time to avoid incomplete data
- Separate DAGs: One DAG per refresh time (not branching within single DAG)
- Idempotent Loads: Snowflake MERGE on unique key handles duplicates
Options Considered¶
Option 1: Separate DAGs with Overlapping Windows ✅ SELECTED¶
Implementation:
# condenast_sprout_morning_refresh_dag.py
@dag(schedule="0 8 * * 1-5", catchup=False)
def condenast_sprout_morning_refresh():
"""Morning: Yesterday 4:30 PM → Today 7:30 AM"""
# condenast_sprout_evening_refresh_dag.py
@dag(schedule="0 17 * * 1-5", catchup=False)
def condenast_sprout_evening_refresh():
"""Evening: Today 7:00 AM → Today 4:30 PM"""
Pros: - ✅ Clear separation - easy to monitor each refresh independently - ✅ Simple to understand - explicit time windows - ✅ Easy to modify one refresh without affecting the other - ✅ No complex branching logic - ✅ Independent SLAs and alerting per refresh - ✅ Scales to N refreshes (just add more DAGs)
Cons: - ❌ Code duplication (minimal - shared tasks in xo-foundry) - ❌ Multiple DAGs to maintain (but clear ownership)
Option 2: Single DAG with Branching¶
Implementation:
@dag(schedule=["0 8 * * 1-5", "0 17 * * 1-5"])
def condenast_sprout_dual_refresh():
@task.branch
def determine_refresh_type(**context):
if context["logical_date"].hour == 8:
return "morning_window"
return "evening_window"
Pros: - ✅ Single DAG reduces proliferation
Cons: - ❌ Branching logic adds complexity - ❌ Harder to debug which refresh failed - ❌ Less clear in Airflow UI - ❌ Difficult to set different SLAs per refresh - ❌ Doesn't scale well to 4+ refreshes per day
Option 3: Single DAG with Non-Overlapping Windows¶
Implementation:
# Morning: Yesterday 5:00 PM → Today 8:00 AM (exact cutoff)
# Evening: Today 8:00 AM → Today 5:00 PM (exact cutoff)
Pros: - ✅ No duplicate data pulled from API
Cons: - ❌ DATA LOSS RISK: Records created/updated at 8:00:00 AM could be missed - ❌ If morning job fails, gap in data coverage - ❌ Assumes API returns perfectly consistent data at cutoff times - ❌ NOT RECOMMENDED for APIs returning "updated since" data
Option 4: Continuous Streaming (Airflow Sensors)¶
Implementation:
@dag(schedule_interval=None, catchup=False)
def condenast_sprout_streaming():
wait = TimeDeltaSensor(delta=timedelta(minutes=30))
extract = extract_sprout_messages()
wait >> extract
Pros: - ✅ Most real-time data
Cons: - ❌ Overkill for this use case - ❌ Complex sensor management - ❌ Higher infrastructure costs - ❌ API rate limiting concerns - ❌ Harder to reason about data completeness
Rationale for Decision¶
Why Separate DAGs?¶
- Operational Clarity
- Each refresh appears as distinct pipeline in Airflow UI
- Easy to see "morning refresh succeeded, evening failed"
-
Clear ownership and responsibility
-
Independent Monitoring
- Set different SLAs: Morning must complete by 9 AM, Evening by 6 PM
- Separate alerting: Page for morning failures, email for evening
-
Track success rates independently
-
Flexible Evolution
- Easy to add 3rd refresh (e.g., noon) without changing existing DAGs
- Can disable one refresh without affecting others
-
Simple to modify buffer/overlap per refresh
-
Testing & Backfills
- Can backfill morning refreshes independently
- Test evening refresh logic without running morning
- Easier to reason about state
Why Overlapping Windows?¶
- Data Completeness Guarantee
- API may have eventual consistency
- Records updated at 8:00:00 AM could appear in either query
-
30-minute overlap catches edge cases
-
Failure Resilience
- If morning job fails, evening job still captures some overlap period
-
Reduces urgency to immediately fix morning failures
-
API Quirks
- Timezone handling inconsistencies
- Rounding differences in timestamp comparisons
- Network timing variations
Idempotent Loading is Key¶
-- Snowflake MERGE handles duplicates gracefully
MERGE INTO bronze.sprout_messages AS target
USING staging.sprout_messages AS source
ON target.guid = source.guid
AND target.updated_at = source.updated_at
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
This makes overlapping windows safe and efficient.
Scaling Patterns¶
2 Refreshes/Day (Condé Nast)¶
Morning: Yesterday 16:30 → Today 07:30 (15h)
Evening: Today 07:00 → Today 16:30 (9.5h)
Overlap: 30 minutes
4 Refreshes/Day (Every 6 Hours)¶
Refresh 1 (06:00): Yesterday 23:30 → Today 05:30 (6h)
Refresh 2 (12:00): Today 05:30 → Today 11:30 (6h)
Refresh 3 (18:00): Today 11:30 → Today 17:30 (6h)
Refresh 4 (00:00): Today 17:30 → Today 23:30 (6h)
Overlap: 30 minutes each
24 Refreshes/Day (Hourly)¶
Pattern: Always use (N-1 end time - buffer) to (N start time - buffer)
Implementation Guidelines¶
1. Time Window Calculation¶
@task
def get_morning_time_window(**context):
"""Calculate morning refresh time window."""
import pendulum
BUFFER_MINUTES = 30
logical_date = context["logical_date"].in_timezone("America/New_York")
# Handle Monday (skip weekend)
if logical_date.day_of_week == 1:
start_time = logical_date.subtract(days=3).replace(hour=16, minute=30)
else:
start_time = logical_date.subtract(days=1).replace(hour=16, minute=30)
# End 30 min before this run
end_time = logical_date.replace(hour=7, minute=30)
return {
"start_date": start_time.to_iso8601_string(),
"end_date": end_time.to_iso8601_string()
}
2. Edge Case Handling¶
# Holidays: Follow business calendar
if is_holiday(logical_date):
# Look back to last business day
last_biz_day = get_previous_business_day(logical_date)
start_time = last_biz_day.replace(hour=16, minute=30)
# Daylight Saving Time: Use timezone-aware dates
# Pendulum handles DST transitions automatically
3. Data Quality Validation¶
@task
def validate_time_window(time_window):
"""Ensure time window is reasonable."""
start = pendulum.parse(time_window["start_date"])
end = pendulum.parse(time_window["end_date"])
duration_hours = (end - start).in_hours()
# Morning should be ~15 hours, evening ~9 hours
if not (6 <= duration_hours <= 20):
raise ValueError(f"Unexpected duration: {duration_hours}h")
return time_window
4. Snowflake Deduplication¶
# In YAML config
snowflake:
target_table: SPROUT_MESSAGES
deduplication:
strategy: multi_field
unique_columns: [GUID, UPDATED_AT]
order_by: [UPDATED_AT DESC]
Real-World Examples¶
Example 1: Warby Parker Gladly (Current Implementation)¶
Pattern: Daily batch at 7 AM EST
@dag(schedule="0 7 * * 1-5")
def warbyparker_daily_operations():
# Pull full previous day: yesterday 00:00 to 23:59
Why: Gladly data is relatively stable after day ends.
Example 2: Fivetran Stripe Connector¶
Pattern: Hourly incremental with 5-minute buffer
# Conceptual - Fivetran's actual implementation
schedule="0 * * * *" # Every hour
start = last_sync_time
end = current_time - 5 minutes
Why: Financial data needs to be near real-time.
Example 3: Stitch Salesforce Connector¶
Pattern: Every 30 minutes with 1-minute overlap
Why: CRM updates happen continuously throughout day.
Consequences¶
Positive¶
- ✅ Clear operational model for intraday refreshes
- ✅ Scalable to any N refreshes per day
- ✅ Resilient to edge cases and failures
- ✅ Easy to monitor and debug
- ✅ Future-proof for new clients with different patterns
Negative¶
- ❌ More DAG files to manage (but shared task code)
- ❌ Slight API inefficiency (pulling duplicate data in overlap)
- ❌ Requires Snowflake MERGE capability (already have)
Neutral¶
- ⚪ Requires clear documentation of time window logic
- ⚪ Need to educate team on overlap rationale
Alternatives Not Considered¶
- Event-driven triggers: Sprout API doesn't support webhooks
- Change Data Capture (CDC): Not available for SaaS APIs
- Lambda/Cloud Functions: Want orchestration in Airflow for consistency
References¶
- Airflow Best Practices - Data Intervals
- Fivetran Incremental Sync Strategy
- ADR 001: Load Strategy Terminology
- Gladly extractor implementation:
packages/xo-foundry/dags/warbyparker_daily_operations_dag.py
Decision Maker¶
Engineering Team
Related Decisions¶
- ADR 001: Load Strategy Terminology (defines full_refresh vs incremental)
- Future: ADR on SLA monitoring for intraday pipelines