ELT Pipeline Flow¶
This document describes the standard Extract → Stage → Load → Transform (ELT) pattern used throughout the XO-Data platform.
Overview¶
All data pipelines in XO-Data follow a consistent four-stage flow:
Pipeline Stages¶
Stage 1: Extract¶
Purpose: Pull raw data from source systems and store in S3 Ingest bucket.
Location: packages/xo-foundry/src/xo_foundry/tasks/extract_tasks.py
Sources:
- Gladly API reports (conversations, contacts, agents, work sessions)
- Sprout Social API (messages, activity)
- Gmail attachments
- Google Sheets
- S3 file uploads
Output: Raw CSV files in S3 Ingest bucket with original column names
No pandas in extraction
All extraction uses native Python csv.DictWriter. Pandas corrupts data via automatic type inference (IDs become floats, leading zeros stripped). Let Snowflake handle type conversion.
Example:
from xo_foundry.tasks.extract_tasks import extract_gladly_data
# Airflow task -- extracts to S3 Ingest
ingest_path = extract_gladly_data(
metric_set="ContactTimestampsReport",
domain="warbyparker",
report_name="contact_timestamps"
)
# Returns: "warbyparker/contact_timestamps/2026-01-15/contact_timestamps_2026-01-15.csv"
S3 Path Pattern:
Stage 2: Stage¶
Purpose: Copy to Stage bucket, standardize column names, generate Snowflake column mappings.
Pattern: Copy-then-Peek (see below)
Operations:
- Copy file from Ingest → Stage bucket (S3-to-S3, fast)
- Read first 8KB to extract headers (range request)
- Standardize column names (UPPERCASE, replace special chars)
- Generate column mapping for COPY INTO
Output: Staged file in S3 Stage bucket with mapping metadata
Example:
from xo_foundry.tasks.stage_tasks import copy_and_standardize
stage_info = copy_and_standardize(
ingest_path=ingest_path,
domain="warbyparker",
report_name="contact_timestamps",
load_strategy="full_refresh"
)
# Returns: {"path": "s3://xo-stage/...", "headers": ["CONTACT_ID", ...]}
S3 Path Pattern (includes load strategy):
Stage 3: Load¶
Purpose: Load staged files into Snowflake BRONZE layer using TRUNCATE + COPY INTO.
Method: Transaction-wrapped TRUNCATE + COPY INTO with FORCE = TRUE (ADR 006)
Operations:
- Begin transaction
- TRUNCATE Bronze table (clean slate)
- COPY INTO with column mapping and FORCE = TRUE
- Inject metadata columns (
RECORD_KEY,RECORD_HASH,DATE_TO_WAREHOUSE,SOURCE_FILE,BATCH_ID,PIPELINE_RUN_ID) - Commit transaction
Output: Raw data in BRONZE layer tables (all VARCHAR + metadata columns)
SQL Pattern:
BEGIN TRANSACTION;
TRUNCATE TABLE WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS;
COPY INTO WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS
FROM (
SELECT
$1 AS CONTACT_ID,
$2 AS TIMESTAMP_FIELD,
$3 AS EVENT_TYPE,
...
FROM @xo_stage/warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv
)
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
FORCE = TRUE
ON_ERROR = 'ABORT_STATEMENT';
COMMIT;
Python Task:
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
table_name = copy_to_snowflake(
stage_path=stage_info["path"],
headers=stage_info["headers"],
database="WBP_DB_DEV",
schema="BRONZE",
table="GLADLY_CONTACT_TIMESTAMPS"
)
Why TRUNCATE + FORCE?
- TRUNCATE ensures idempotent loading (same result every run)
- FORCE = TRUE bypasses Snowflake's 14-day copy history constraint
- Transaction makes the operation atomic (rollback on failure)
- Historical data is preserved in Silver, not Bronze
Stage 4: Transform¶
Purpose: Transform BRONZE → SILVER → GOLD using dbt.
BRONZE → SILVER:
- Type conversions (VARCHAR → INT, TIMESTAMP, etc.)
- Data quality validations
- Deduplication on unique keys
- Historical preservation (Silver keeps what Bronze discards)
SILVER → GOLD (four types):
- Facts (
fct_): Enrich with roster/glossary JOINs, filter to XO agents - Dimensions (
dim_): Current state reference entities - Aggregates (
agg_): Pre-aggregated metrics (GROUP BY) - Reports (
rpt_): Consumption views (zero storage)
Example dbt flow:
-- models/silver/contact_timestamps.sql
SELECT
CONTACT_ID::VARCHAR AS CONTACT_ID,
TO_TIMESTAMP(TIMESTAMP_FIELD) AS TIMESTAMP_FIELD,
UPPER(TRIM(EVENT_TYPE)) AS EVENT_TYPE,
...
FROM {{ source('bronze', 'gladly_contact_timestamps') }}
WHERE CONTACT_ID IS NOT NULL
-- models/gold/fct_contacts.sql
SELECT
ct.*, r.AGENT_NAME, r.TEAM
FROM {{ ref('contact_timestamps') }} ct
LEFT JOIN {{ source('core_silver', 'roster_warbyparker') }} r
ON ct.TARGET_AGENT_ID = r.AGENT_ID
WHERE r.AGENT_NAME IS NOT NULL
-- models/gold/agg_agent_daily.sql
SELECT
DATE_TRUNC('day', CREATED_AT) AS DATE,
AGENT_NAME,
COUNT(DISTINCT CONTACT_ID) AS TOTAL_CONTACTS
FROM {{ ref('fct_contacts') }}
GROUP BY 1, 2
Load Strategies¶
Per ADR 001, three load strategies determine S3 path structure and loading behavior:
| Strategy | S3 Path | Bronze Loading | Use Case |
|---|---|---|---|
full_refresh |
{report}/full_refresh/{date}/ |
TRUNCATE + COPY | Most common (Gladly) |
incremental |
{report}/incremental/{timestamp}/ |
TRUNCATE + COPY | Google Sheets |
historical |
{report}/historical/{date}/ |
TRUNCATE + COPY | Late-arriving data |
All strategies truncate Bronze
Regardless of load strategy, Bronze tables are always truncated before loading. The load strategy affects S3 path organization and how the time window is calculated, but the Bronze loading pattern is always TRUNCATE + COPY INTO.
Time Windows¶
Centralized time window calculation supports three modes:
# Daily: extract yesterday's data
time_window:
refresh_type: daily
lag: { days: 1 }
timezone: "America/New_York"
# Intraday Relative: 8-hour lookback
time_window:
refresh_type: intraday
lag: { minutes: 30 }
lookback: "8 hours"
timezone: "America/New_York"
# Intraday Absolute: fixed window
time_window:
refresh_type: intraday
start_time: "2026-01-15T17:00:00"
end_time: "2026-01-16T09:00:00"
timezone: "America/New_York"
Metadata Columns¶
All Bronze tables include six standard metadata columns per ADR 004:
| Column | Purpose |
|---|---|
RECORD_KEY |
Deduplication key (raw value or composite hash) |
RECORD_HASH |
NULL-safe hash of ALL data columns for change detection |
DATE_TO_WAREHOUSE |
Timestamp when data was loaded |
SOURCE_FILE |
Source file name for lineage |
BATCH_ID |
Batch identifier |
PIPELINE_RUN_ID |
Airflow run identifier |
JSON Column Handling¶
When a source has JSON columns (e.g., Sprout API's activity_metadata), specify them in YAML config:
This enables JSON normalization in RECORD_HASH calculations, preventing phantom updates from JSON key ordering differences.
Copy-then-Peek Pattern¶
Problem¶
Traditional S3 → Snowflake pipelines download the entire file to read headers:
# Slow for large files (downloads entire 10GB file)
df = pd.read_csv("s3://bucket/large-file.csv")
headers = df.columns.tolist()
Solution¶
Copy-then-Peek avoids downloading by:
- Copying file within S3 (fast, S3-to-S3)
- Reading only first 8KB using range request
from xo_foundry.s3_utils import copy_and_peek_s3_file
headers = copy_and_peek_s3_file(
source_bucket="xo-ingest",
source_key="warbyparker/contact_timestamps/2026-01-15/data.csv",
dest_bucket="xo-stage",
dest_key="warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv",
peek_bytes=8192
)
Performance Benefits¶
| File Size | Traditional | Copy-then-Peek | Improvement |
|---|---|---|---|
| 10 MB | 5 seconds | 0.5 seconds | 10x faster |
| 1 GB | 120 seconds | 0.5 seconds | 240x faster |
| 10 GB | 1200 seconds | 0.5 seconds | 2400x faster |
Key Insight: Header extraction time is constant (~0.5s) regardless of file size.
Airflow Pipeline Example¶
A complete pipeline using xo-foundry tasks with Airflow 3.0 TaskFlow API:
from airflow.decorators import dag, task
from xo_foundry.tasks.extract_tasks import extract_gladly_data
from xo_foundry.tasks.stage_tasks import copy_and_standardize
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
from datetime import datetime
@dag(
schedule="50 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False
)
def warbyparker_gladly_daily_dag():
"""Extract Gladly reports → S3 → Snowflake BRONZE"""
# Extract from Gladly API (csv.DictWriter, not pandas)
ingest = extract_gladly_data(
metric_set="ContactTimestampsReport",
domain="warbyparker",
report_name="contact_timestamps"
)
# Stage: Copy-then-Peek, standardize columns
stage = copy_and_standardize(ingest)
# Load: TRUNCATE + COPY INTO Bronze
load = copy_to_snowflake(stage)
warbyparker_gladly_daily_dag()
Error Handling¶
Extract Failures¶
@task(retries=3, retry_delay=timedelta(minutes=5))
def extract_gladly_data(**context):
try:
# ... extraction logic
return ingest_path
except Exception as e:
logger.error(f"Gladly API error: {e}")
raise
Load Failures¶
The TRUNCATE + COPY INTO pattern is wrapped in a transaction. If COPY INTO fails, the TRUNCATE is rolled back, preserving the previous data.
Monitoring¶
Airflow Metrics¶
- Task duration
- Success/failure rates
- Retry counts
Snowflake Metrics¶
-- Freshness check
SELECT MAX(DATE_TO_WAREHOUSE) AS LATEST_LOAD
FROM WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS;
-- Row counts by layer
SELECT 'BRONZE' AS LAYER, COUNT(*) FROM WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS
UNION ALL
SELECT 'SILVER', COUNT(*) FROM WBP_DB.SILVER.CONTACT_TIMESTAMPS
UNION ALL
SELECT 'GOLD', COUNT(*) FROM WBP_DB.GOLD.fct_contacts;
Next Steps¶
- Snowflake Medallion Architecture -- Layer details
- ELT Layer Architecture -- Layer responsibilities
- xo-foundry Package -- Task library and DAG Factory
- Naming Conventions -- Standards
- Architecture Decisions -- ADRs
Related Documentation: