Skip to content

ELT Pipeline Flow

This document describes the standard Extract → Stage → Load → Transform (ELT) pattern used throughout the XO-Data platform.

Overview

All data pipelines in XO-Data follow a consistent four-stage flow:

Extract → Stage → Load → Transform

Source → S3 Ingest → S3 Stage → Snowflake BRONZE → SILVER → GOLD

Pipeline Stages

Stage 1: Extract

Purpose: Pull raw data from source systems and store in S3 Ingest bucket.

Location: packages/xo-foundry/src/xo_foundry/tasks/extract_tasks.py

Sources:

  • Gladly API reports (conversations, contacts, agents, work sessions)
  • Sprout Social API (messages, activity)
  • Gmail attachments
  • Google Sheets
  • S3 file uploads

Output: Raw CSV files in S3 Ingest bucket with original column names

No pandas in extraction

All extraction uses native Python csv.DictWriter. Pandas corrupts data via automatic type inference (IDs become floats, leading zeros stripped). Let Snowflake handle type conversion.

Example:

from xo_foundry.tasks.extract_tasks import extract_gladly_data

# Airflow task -- extracts to S3 Ingest
ingest_path = extract_gladly_data(
    metric_set="ContactTimestampsReport",
    domain="warbyparker",
    report_name="contact_timestamps"
)
# Returns: "warbyparker/contact_timestamps/2026-01-15/contact_timestamps_2026-01-15.csv"

S3 Path Pattern:

s3://xo-ingest/{domain}/{report}/{date}/{filename}

Stage 2: Stage

Purpose: Copy to Stage bucket, standardize column names, generate Snowflake column mappings.

Pattern: Copy-then-Peek (see below)

Operations:

  1. Copy file from Ingest → Stage bucket (S3-to-S3, fast)
  2. Read first 8KB to extract headers (range request)
  3. Standardize column names (UPPERCASE, replace special chars)
  4. Generate column mapping for COPY INTO

Output: Staged file in S3 Stage bucket with mapping metadata

Example:

from xo_foundry.tasks.stage_tasks import copy_and_standardize

stage_info = copy_and_standardize(
    ingest_path=ingest_path,
    domain="warbyparker",
    report_name="contact_timestamps",
    load_strategy="full_refresh"
)
# Returns: {"path": "s3://xo-stage/...", "headers": ["CONTACT_ID", ...]}

S3 Path Pattern (includes load strategy):

s3://xo-stage/{domain}/{report}/{load_strategy}/{date_or_timestamp}/{filename}

Stage 3: Load

Purpose: Load staged files into Snowflake BRONZE layer using TRUNCATE + COPY INTO.

Method: Transaction-wrapped TRUNCATE + COPY INTO with FORCE = TRUE (ADR 006)

Operations:

  1. Begin transaction
  2. TRUNCATE Bronze table (clean slate)
  3. COPY INTO with column mapping and FORCE = TRUE
  4. Inject metadata columns (RECORD_KEY, RECORD_HASH, DATE_TO_WAREHOUSE, SOURCE_FILE, BATCH_ID, PIPELINE_RUN_ID)
  5. Commit transaction

Output: Raw data in BRONZE layer tables (all VARCHAR + metadata columns)

SQL Pattern:

BEGIN TRANSACTION;

TRUNCATE TABLE WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS;

COPY INTO WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS
FROM (
    SELECT
        $1 AS CONTACT_ID,
        $2 AS TIMESTAMP_FIELD,
        $3 AS EVENT_TYPE,
        ...
    FROM @xo_stage/warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv
)
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)
FORCE = TRUE
ON_ERROR = 'ABORT_STATEMENT';

COMMIT;

Python Task:

from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake

table_name = copy_to_snowflake(
    stage_path=stage_info["path"],
    headers=stage_info["headers"],
    database="WBP_DB_DEV",
    schema="BRONZE",
    table="GLADLY_CONTACT_TIMESTAMPS"
)

Why TRUNCATE + FORCE?

  • TRUNCATE ensures idempotent loading (same result every run)
  • FORCE = TRUE bypasses Snowflake's 14-day copy history constraint
  • Transaction makes the operation atomic (rollback on failure)
  • Historical data is preserved in Silver, not Bronze

Stage 4: Transform

Purpose: Transform BRONZE → SILVER → GOLD using dbt.

BRONZE → SILVER:

  • Type conversions (VARCHAR → INT, TIMESTAMP, etc.)
  • Data quality validations
  • Deduplication on unique keys
  • Historical preservation (Silver keeps what Bronze discards)

SILVER → GOLD (four types):

  • Facts (fct_): Enrich with roster/glossary JOINs, filter to XO agents
  • Dimensions (dim_): Current state reference entities
  • Aggregates (agg_): Pre-aggregated metrics (GROUP BY)
  • Reports (rpt_): Consumption views (zero storage)

Example dbt flow:

-- models/silver/contact_timestamps.sql
SELECT
    CONTACT_ID::VARCHAR AS CONTACT_ID,
    TO_TIMESTAMP(TIMESTAMP_FIELD) AS TIMESTAMP_FIELD,
    UPPER(TRIM(EVENT_TYPE)) AS EVENT_TYPE,
    ...
FROM {{ source('bronze', 'gladly_contact_timestamps') }}
WHERE CONTACT_ID IS NOT NULL

-- models/gold/fct_contacts.sql
SELECT
    ct.*, r.AGENT_NAME, r.TEAM
FROM {{ ref('contact_timestamps') }} ct
LEFT JOIN {{ source('core_silver', 'roster_warbyparker') }} r
    ON ct.TARGET_AGENT_ID = r.AGENT_ID
WHERE r.AGENT_NAME IS NOT NULL

-- models/gold/agg_agent_daily.sql
SELECT
    DATE_TRUNC('day', CREATED_AT) AS DATE,
    AGENT_NAME,
    COUNT(DISTINCT CONTACT_ID) AS TOTAL_CONTACTS
FROM {{ ref('fct_contacts') }}
GROUP BY 1, 2

Load Strategies

Per ADR 001, three load strategies determine S3 path structure and loading behavior:

Strategy S3 Path Bronze Loading Use Case
full_refresh {report}/full_refresh/{date}/ TRUNCATE + COPY Most common (Gladly)
incremental {report}/incremental/{timestamp}/ TRUNCATE + COPY Google Sheets
historical {report}/historical/{date}/ TRUNCATE + COPY Late-arriving data

All strategies truncate Bronze

Regardless of load strategy, Bronze tables are always truncated before loading. The load strategy affects S3 path organization and how the time window is calculated, but the Bronze loading pattern is always TRUNCATE + COPY INTO.

Time Windows

Centralized time window calculation supports three modes:

# Daily: extract yesterday's data
time_window:
  refresh_type: daily
  lag: { days: 1 }
  timezone: "America/New_York"

# Intraday Relative: 8-hour lookback
time_window:
  refresh_type: intraday
  lag: { minutes: 30 }
  lookback: "8 hours"
  timezone: "America/New_York"

# Intraday Absolute: fixed window
time_window:
  refresh_type: intraday
  start_time: "2026-01-15T17:00:00"
  end_time: "2026-01-16T09:00:00"
  timezone: "America/New_York"

Metadata Columns

All Bronze tables include six standard metadata columns per ADR 004:

Column Purpose
RECORD_KEY Deduplication key (raw value or composite hash)
RECORD_HASH NULL-safe hash of ALL data columns for change detection
DATE_TO_WAREHOUSE Timestamp when data was loaded
SOURCE_FILE Source file name for lineage
BATCH_ID Batch identifier
PIPELINE_RUN_ID Airflow run identifier

JSON Column Handling

When a source has JSON columns (e.g., Sprout API's activity_metadata), specify them in YAML config:

sources:
  sprout_messages:
    json_columns:
      - activity_metadata

This enables JSON normalization in RECORD_HASH calculations, preventing phantom updates from JSON key ordering differences.

Copy-then-Peek Pattern

Problem

Traditional S3 → Snowflake pipelines download the entire file to read headers:

# Slow for large files (downloads entire 10GB file)
df = pd.read_csv("s3://bucket/large-file.csv")
headers = df.columns.tolist()

Solution

Copy-then-Peek avoids downloading by:

  1. Copying file within S3 (fast, S3-to-S3)
  2. Reading only first 8KB using range request
from xo_foundry.s3_utils import copy_and_peek_s3_file

headers = copy_and_peek_s3_file(
    source_bucket="xo-ingest",
    source_key="warbyparker/contact_timestamps/2026-01-15/data.csv",
    dest_bucket="xo-stage",
    dest_key="warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv",
    peek_bytes=8192
)

Performance Benefits

File Size Traditional Copy-then-Peek Improvement
10 MB 5 seconds 0.5 seconds 10x faster
1 GB 120 seconds 0.5 seconds 240x faster
10 GB 1200 seconds 0.5 seconds 2400x faster

Key Insight: Header extraction time is constant (~0.5s) regardless of file size.

Airflow Pipeline Example

A complete pipeline using xo-foundry tasks with Airflow 3.0 TaskFlow API:

from airflow.decorators import dag, task
from xo_foundry.tasks.extract_tasks import extract_gladly_data
from xo_foundry.tasks.stage_tasks import copy_and_standardize
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
from datetime import datetime

@dag(
    schedule="50 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False
)
def warbyparker_gladly_daily_dag():
    """Extract Gladly reports → S3 → Snowflake BRONZE"""

    # Extract from Gladly API (csv.DictWriter, not pandas)
    ingest = extract_gladly_data(
        metric_set="ContactTimestampsReport",
        domain="warbyparker",
        report_name="contact_timestamps"
    )

    # Stage: Copy-then-Peek, standardize columns
    stage = copy_and_standardize(ingest)

    # Load: TRUNCATE + COPY INTO Bronze
    load = copy_to_snowflake(stage)

warbyparker_gladly_daily_dag()

Error Handling

Extract Failures

@task(retries=3, retry_delay=timedelta(minutes=5))
def extract_gladly_data(**context):
    try:
        # ... extraction logic
        return ingest_path
    except Exception as e:
        logger.error(f"Gladly API error: {e}")
        raise

Load Failures

The TRUNCATE + COPY INTO pattern is wrapped in a transaction. If COPY INTO fails, the TRUNCATE is rolled back, preserving the previous data.

Monitoring

Airflow Metrics

  • Task duration
  • Success/failure rates
  • Retry counts

Snowflake Metrics

-- Freshness check
SELECT MAX(DATE_TO_WAREHOUSE) AS LATEST_LOAD
FROM WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS;

-- Row counts by layer
SELECT 'BRONZE' AS LAYER, COUNT(*) FROM WBP_DB.BRONZE.GLADLY_CONTACT_TIMESTAMPS
UNION ALL
SELECT 'SILVER', COUNT(*) FROM WBP_DB.SILVER.CONTACT_TIMESTAMPS
UNION ALL
SELECT 'GOLD', COUNT(*) FROM WBP_DB.GOLD.fct_contacts;

Next Steps


Related Documentation: