Creating New Pipelines with xo-foundry¶
Purpose: Walk through the end-to-end process of creating a new ELT pipeline — from writing a YAML config to a deployed, running Airflow DAG.
Prerequisites¶
- Developer setup complete (developer-setup.md)
uv syncrun at the repo root- Airflow Variables provisioned in Astronomer for your credentials (ask the DE team)
- Bronze DDL migration deployed for your target table (schemachange-deployment.md)
Overview¶
xo-foundry generates Airflow DAGs from YAML configuration files. The workflow is:
You do not write Python DAG code directly. The factory handles task wiring, metadata enrichment, S3 path construction, and Snowflake loading. Your job is to write a correct YAML config.
File locations:
- Production configs: apps/airflow/xo-pipelines/dags/configs/ — source of truth
- Templates/examples: packages/xo-foundry/configs/examples/ — copy and modify
- Generated DAGs: apps/airflow/xo-pipelines/dags/ — never edit these by hand
Step 1 — Choose a Template¶
Copy the closest example to your use case:
# Gladly API pipeline
cp packages/xo-foundry/configs/examples/template-gladly-timestamps.yaml \
apps/airflow/xo-pipelines/dags/configs/{client}-{source}-daily.yaml
# Google Sheets pipeline
cp packages/xo-foundry/configs/examples/simple-single-report.yaml \
apps/airflow/xo-pipelines/dags/configs/{client}-gsheet-daily.yaml
# Sprout Social pipeline
cp packages/xo-foundry/configs/examples/template-sprout-daily.yaml \
apps/airflow/xo-pipelines/dags/configs/{client}-sprout-daily.yaml
# Multi-source pipeline
cp packages/xo-foundry/configs/examples/condenast-multi-source-example.yaml \
apps/airflow/xo-pipelines/dags/configs/{client}-multi-daily.yaml
Naming convention: {client}-{source}-{cadence}.yaml → warbyparker-gladly-daily.yaml
Step 2 — Write the Config¶
Minimal config structure¶
Every config has three top-level sections: dag, globals, and sources.
dag:
domain: warbyparker # Client name (snake_case)
pipeline_name: gladly_daily # Unique pipeline ID (snake_case)
pipeline_type: snowflake_load # Always snowflake_load for ELT pipelines
description: "Warby Parker Gladly reports - daily load to WBP_DB"
schedule: "50 6 * * *" # Cron: 6:50 AM UTC daily
owner: data-engineering
tags:
- warbyparker
- gladly
- production
default_args:
retries: 2
retry_delay_minutes: 5
start_date: "2025-01-01"
time_window:
refresh_type: daily
lag:
days: 1 # Extract yesterday's data
timezone: "America/New_York"
globals:
snowflake:
database: WBP_DB_DEV # Dev database — prod is set via environments block
schema: BRONZE
connection_id: snowflake_loader
warehouse: XO_DEV_WH
s3:
ingest_prefix: ingest-bucket
stage_prefix: stage-bucket
metadata:
enabled: true
add_record_key: true
add_record_hash: true
add_load_timestamp: true
add_batch_id: true
add_pipeline_run_id: true
dbt:
enabled: false # Set to true if dbt models exist downstream
environments:
dev:
database: WBP_DB_DEV
schema: BRONZE
warehouse: XO_DEV_WH
prod:
database: WBP_DB
schema: BRONZE
warehouse: XO_PROD_WH
sources:
contact_timestamps: # Source name (snake_case, becomes part of S3 path)
source_type: gladly_api
load_strategy: full_refresh
extractor:
metric_set: ContactTimestampsReport
base_url_var: WARBYPARKER_GLADLY_BASE_URL # Airflow Variable name
email_var: WARBYPARKER_GLADLY_EMAIL
token_var: WARBYPARKER_GLADLY_API_TOKEN
timezone: America/New_York
endpoint_type: reports
paths:
report_name: contact_timestamps
filename_pattern: "contact_timestamps_{date}.csv"
snowflake:
target_table: GLADLY_CONTACT_TIMESTAMPS
load_strategy: batch_replace
deduplication:
strategy: composite_key
unique_columns:
- CONTACT_ID
- TIMESTAMP_FIELD
- EVENT_TYPE
use_hash: true
Step 3 — Configure Your Source¶
Source type reference¶
Pick the source_type that matches your data source, then fill in the extractor block accordingly.
Gladly API (gladly_api)¶
extractor:
metric_set: ContactTimestampsReport # Gladly report name (exact string)
base_url_var: CLIENT_GLADLY_BASE_URL # Airflow Variable: https://{org}.gladly.com
email_var: CLIENT_GLADLY_EMAIL
token_var: CLIENT_GLADLY_API_TOKEN
timezone: America/New_York # Client's local timezone
endpoint_type: reports # Usually "reports"
Common metric_set values: ContactTimestampsReport, ContactExportReportV3,
AgentDurationReport, ConversationTimestampsReport, WorkSessionReport
Google Sheets (gsheet)¶
extractor:
credentials_var: DATA_ENGINEERING_SERVICE_ACCOUNT # Airflow Variable: service account JSON
spreadsheet_id: "1DOsvWTzFi796eqYXQ-OceWp7lGndvSbtUH-20xTRcMc"
sheet_name: ACTIVE # Sheet tab name (exact, case-sensitive)
range: "A:Z" # Optional: limit to specific columns
columns: # Optional: whitelist of columns to keep
- employee_id
- name
- status
Use incremental load strategy with single_field deduplication for sheets that have a unique ID column.
Sprout Social (sprout_api)¶
extractor:
api_key_var: CLIENT_SPROUT_API_KEY
client_id_var: CLIENT_SPROUT_CLIENT_ID
group_id: "2184345" # Sprout customer group ID
timezone: America/New_York
post_types: # Message types to include
- TWITTER_DIRECT_MESSAGE
- FACEBOOK_MESSENGER_PM
- INSTAGRAM_DIRECT_MESSAGE
profile_ids: # Optional: limit to specific profiles
- "6066623"
fields: # Optional: field whitelist
- guid
- post_type
- created_time
- activity_metadata # JSON column — add to json_columns below
date_anchor: created_time # or action_last_update_time
page_limit: 100 # Pagination limit per request
If any field contains JSON (like activity_metadata), add it to json_columns at the source level:
Gmail (gmail)¶
extractor:
email_address_var: CLIENT_GMAIL_ADDRESS
credentials_var: CLIENT_GMAIL_CREDENTIALS
auth_var: CLIENT_GMAIL_AUTH
query_string: "subject:Daily Report has:attachment" # Raw Gmail query
# OR use structured query:
query:
subject: "Daily Report"
has: attachment
filename: "report.csv"
Google Drive (gdrive)¶
extractor:
credentials_var: CLIENT_GDRIVE_CREDENTIALS
auth_type: service_account # or oauth2
file_name: "daily_export.csv"
folder_id: "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgVE2upms"
Step 4 — Choose Load and Dedup Strategy¶
Load strategy (source-level load_strategy)¶
This controls how extracted data arrives in S3 and whether it replaces or accumulates.
| Strategy | S3 path includes | Use when |
|---|---|---|
full_refresh |
full_refresh/{date}/ |
You pull a full snapshot daily (most Gladly reports) |
incremental |
incremental/{timestamp}/ |
You pull all records and dedup in the warehouse (Google Sheets) |
historical |
historical/{ts}/{date}/ |
Complex, avoid — negotiate for daily snapshots instead |
Bronze load strategy (snowflake-level load_strategy)¶
This controls how data is written into the Bronze table.
| Strategy | Behavior | Use when |
|---|---|---|
batch_replace |
DELETE WHERE BATCH_ID + COPY INTO | Default for new pipelines — preserves history by batch |
truncate_insert |
TRUNCATE + COPY INTO | Legacy pipelines only — destructive, avoid for new work |
Rule: All new pipelines should use batch_replace at the Snowflake level (see ADR 011).
Deduplication strategy¶
The deduplication block controls how RECORD_KEY is generated for each row.
# Single unique column (simple ID)
deduplication:
strategy: single_field
unique_columns:
- CONTACT_ID
use_hash: false # Use raw value as RECORD_KEY
# Single unique column, hashed (e.g., long strings)
deduplication:
strategy: single_field
unique_columns:
- CONTACT_ID
use_hash: true # MD5(CONTACT_ID) as RECORD_KEY
# Multiple columns define uniqueness
deduplication:
strategy: composite_key
unique_columns:
- CONTACT_ID
- EVENT_TYPE
- TIMESTAMP_FIELD
use_hash: true # MD5(CONTACT_ID||EVENT_TYPE||TIMESTAMP_FIELD)
Step 5 — Configure the Time Window¶
The time window controls which date range to extract.
# Daily — extract yesterday (most common)
time_window:
refresh_type: daily
lag:
days: 1
timezone: "America/New_York"
# Daily — extract current day (no lag)
time_window:
refresh_type: daily
timezone: "America/New_York"
# Intraday — rolling 8-hour window, 30-minute lag
time_window:
refresh_type: intraday
lag:
minutes: 30
lookback: "8 hours"
timezone: "America/New_York"
LagConfig fields: days, hours, minutes — all optional, default 0.
For intraday pipelines with multiple refreshes (e.g., morning + evening), use refresh_pattern: intraday with per-refresh refreshes list. Each refresh generates a separate DAG. See packages/xo-foundry/configs/examples/template-sprout-intraday.yaml.
Step 6 — Enable dbt (if models exist)¶
If dbt Silver/Gold models consume this Bronze table, enable dbt in globals:
globals:
dbt:
enabled: true
project_dir: /usr/local/airflow/dbt/xo_medallion
target: dev
connection_id: snowflake_dbt
select: "tag:gladly+" # Cosmos dbt selector (defaults to tag:{domain})
# exclude: "model_name" # Optional: skip specific models
The select field uses dbt's selector syntax. tag:gladly+ runs all models tagged gladly and their downstream dependents.
Step 7 — Validate and Generate¶
Validate the config¶
uv run xo-foundry validate-config \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml
Expected output:
✓ Config valid
DAG ID: warbyparker_gladly_daily
Domain: warbyparker
Pipeline: gladly_daily
Sources: 1
Pipeline type: snowflake_load
Fix any validation errors before proceeding.
Generate the DAG¶
uv run xo-foundry generate-dag \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
--output apps/airflow/xo-pipelines/dags/
This writes apps/airflow/xo-pipelines/dags/warbyparker_gladly_daily.py.
Never hand-edit the generated DAG Python file. Re-run the generator after changing the YAML config.
Step 8 — Test Locally¶
# Start local Airflow
cd apps/airflow/xo-pipelines
astro dev start
# Verify the DAG appears in the Airflow UI (http://localhost:8080)
# Trigger a manual run with a test date
# Check task logs for errors
If the extractor is new, test it in isolation first:
uv run xo-foundry test-extractor \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
--source contact_timestamps \
--date 2025-01-15
Step 9 — Deploy¶
After deploy, verify in the Astronomer UI that the DAG appears, is unpaused, and runs successfully on its next scheduled execution.
Step 10 — Update Inventory¶
After successful production runs, update the Snowflake object inventory:
- docs/reference/snowflake-object-inventory.md — add the new Bronze table
Multi-Source Pipelines¶
To extract multiple reports in a single DAG, add multiple entries under sources:
sources:
contact_timestamps:
source_type: gladly_api
# ...
work_sessions:
source_type: gladly_api
extractor:
metric_set: WorkSessionReport
# same credentials vars
paths:
report_name: work_sessions
filename_pattern: "work_sessions_{date}.csv"
snowflake:
target_table: GLADLY_WORK_SESSIONS
load_strategy: batch_replace
deduplication:
strategy: single_field
unique_columns: [SESSION_ID]
use_hash: true
Each source runs as a parallel task group. All sources in a DAG share the globals (same Snowflake database, S3 prefixes, dbt config).
Verification Checklist¶
Before opening a PR:
- Config validates:
uv run xo-foundry validate-config --config <path> - DAG generates without errors:
uv run xo-foundry generate-dag --config <path> --output dags/ - DAG appears in local Airflow UI
- Manual trigger completes with expected row counts in Bronze table
- Bronze DDL migration already deployed (table exists in target DB)
- Airflow Variables provisioned for all
*_varcredential fields - Config file committed to
apps/airflow/xo-pipelines/dags/configs/ - Generated DAG file committed to
apps/airflow/xo-pipelines/dags/ - Snowflake object inventory updated
Troubleshooting¶
validate-config fails with Pydantic validation error
Read the error message — it identifies the exact field and constraint. Common mistakes:
- start_date must be "YYYY-MM-DD" format (quoted string)
- schedule must be a valid cron expression
- connection_id under globals.snowflake is required (often forgotten)
- source_type in the extractor block must match the parent source's source_type
DAG does not appear in Airflow UI
The generated DAG file may have a syntax error. Check Airflow's Import Errors page. Re-run generate-dag and look for template rendering errors in the output.
Extractor returns zero records
- Verify the Airflow Variable names match exactly (case-sensitive)
- Check the time window: if lag.days: 1 and you're testing on a Monday, it pulls Sunday's data
- Run test-extractor in isolation and inspect the output
Insufficient privileges in Snowflake
Stop immediately. Do not retry or work around. Report the exact error to the engineering team — permissions are managed centrally. See snowflake-safety.md.
RECORD_KEY collisions (duplicate rows in Bronze)
Your deduplication strategy is too narrow. Add more columns to unique_columns or switch from single_field to composite_key. Check the source data to understand what combination of fields is actually unique.
Related¶
- New Extractor Checklist — adding a new extractor class to xo-core
- Schemachange Deployment — deploying Bronze DDL migrations
- dbt Development Workflow — adding Silver/Gold models downstream
- xo-foundry Roadmap — planned features and phases
- ADR 001 — load strategy definitions
- ADR 011 — batch_replace vs truncate_insert