Skip to content

Creating New Pipelines with xo-foundry

Purpose: Walk through the end-to-end process of creating a new ELT pipeline — from writing a YAML config to a deployed, running Airflow DAG.


Prerequisites

  • Developer setup complete (developer-setup.md)
  • uv sync run at the repo root
  • Airflow Variables provisioned in Astronomer for your credentials (ask the DE team)
  • Bronze DDL migration deployed for your target table (schemachange-deployment.md)

Overview

xo-foundry generates Airflow DAGs from YAML configuration files. The workflow is:

Write YAML config → Validate → Generate DAG → Deploy to Astronomer

You do not write Python DAG code directly. The factory handles task wiring, metadata enrichment, S3 path construction, and Snowflake loading. Your job is to write a correct YAML config.

File locations: - Production configs: apps/airflow/xo-pipelines/dags/configs/ — source of truth - Templates/examples: packages/xo-foundry/configs/examples/ — copy and modify - Generated DAGs: apps/airflow/xo-pipelines/dags/ — never edit these by hand


Step 1 — Choose a Template

Copy the closest example to your use case:

# Gladly API pipeline
cp packages/xo-foundry/configs/examples/template-gladly-timestamps.yaml \
   apps/airflow/xo-pipelines/dags/configs/{client}-{source}-daily.yaml

# Google Sheets pipeline
cp packages/xo-foundry/configs/examples/simple-single-report.yaml \
   apps/airflow/xo-pipelines/dags/configs/{client}-gsheet-daily.yaml

# Sprout Social pipeline
cp packages/xo-foundry/configs/examples/template-sprout-daily.yaml \
   apps/airflow/xo-pipelines/dags/configs/{client}-sprout-daily.yaml

# Multi-source pipeline
cp packages/xo-foundry/configs/examples/condenast-multi-source-example.yaml \
   apps/airflow/xo-pipelines/dags/configs/{client}-multi-daily.yaml

Naming convention: {client}-{source}-{cadence}.yamlwarbyparker-gladly-daily.yaml


Step 2 — Write the Config

Minimal config structure

Every config has three top-level sections: dag, globals, and sources.

dag:
  domain: warbyparker              # Client name (snake_case)
  pipeline_name: gladly_daily      # Unique pipeline ID (snake_case)
  pipeline_type: snowflake_load    # Always snowflake_load for ELT pipelines
  description: "Warby Parker Gladly reports - daily load to WBP_DB"
  schedule: "50 6 * * *"          # Cron: 6:50 AM UTC daily
  owner: data-engineering
  tags:
    - warbyparker
    - gladly
    - production
  default_args:
    retries: 2
    retry_delay_minutes: 5
    start_date: "2025-01-01"

  time_window:
    refresh_type: daily
    lag:
      days: 1                      # Extract yesterday's data
    timezone: "America/New_York"

globals:
  snowflake:
    database: WBP_DB_DEV           # Dev database — prod is set via environments block
    schema: BRONZE
    connection_id: snowflake_loader
    warehouse: XO_DEV_WH

  s3:
    ingest_prefix: ingest-bucket
    stage_prefix: stage-bucket

  metadata:
    enabled: true
    add_record_key: true
    add_record_hash: true
    add_load_timestamp: true
    add_batch_id: true
    add_pipeline_run_id: true

  dbt:
    enabled: false                 # Set to true if dbt models exist downstream

  environments:
    dev:
      database: WBP_DB_DEV
      schema: BRONZE
      warehouse: XO_DEV_WH
    prod:
      database: WBP_DB
      schema: BRONZE
      warehouse: XO_PROD_WH

sources:
  contact_timestamps:              # Source name (snake_case, becomes part of S3 path)
    source_type: gladly_api
    load_strategy: full_refresh
    extractor:
      metric_set: ContactTimestampsReport
      base_url_var: WARBYPARKER_GLADLY_BASE_URL   # Airflow Variable name
      email_var: WARBYPARKER_GLADLY_EMAIL
      token_var: WARBYPARKER_GLADLY_API_TOKEN
      timezone: America/New_York
      endpoint_type: reports
    paths:
      report_name: contact_timestamps
      filename_pattern: "contact_timestamps_{date}.csv"
    snowflake:
      target_table: GLADLY_CONTACT_TIMESTAMPS
      load_strategy: batch_replace
      deduplication:
        strategy: composite_key
        unique_columns:
          - CONTACT_ID
          - TIMESTAMP_FIELD
          - EVENT_TYPE
        use_hash: true

Step 3 — Configure Your Source

Source type reference

Pick the source_type that matches your data source, then fill in the extractor block accordingly.

Gladly API (gladly_api)

extractor:
  metric_set: ContactTimestampsReport   # Gladly report name (exact string)
  base_url_var: CLIENT_GLADLY_BASE_URL  # Airflow Variable: https://{org}.gladly.com
  email_var: CLIENT_GLADLY_EMAIL
  token_var: CLIENT_GLADLY_API_TOKEN
  timezone: America/New_York            # Client's local timezone
  endpoint_type: reports                # Usually "reports"

Common metric_set values: ContactTimestampsReport, ContactExportReportV3, AgentDurationReport, ConversationTimestampsReport, WorkSessionReport

Google Sheets (gsheet)

extractor:
  credentials_var: DATA_ENGINEERING_SERVICE_ACCOUNT  # Airflow Variable: service account JSON
  spreadsheet_id: "1DOsvWTzFi796eqYXQ-OceWp7lGndvSbtUH-20xTRcMc"
  sheet_name: ACTIVE                    # Sheet tab name (exact, case-sensitive)
  range: "A:Z"                          # Optional: limit to specific columns
  columns:                              # Optional: whitelist of columns to keep
    - employee_id
    - name
    - status

Use incremental load strategy with single_field deduplication for sheets that have a unique ID column.

Sprout Social (sprout_api)

extractor:
  api_key_var: CLIENT_SPROUT_API_KEY
  client_id_var: CLIENT_SPROUT_CLIENT_ID
  group_id: "2184345"                   # Sprout customer group ID
  timezone: America/New_York
  post_types:                           # Message types to include
    - TWITTER_DIRECT_MESSAGE
    - FACEBOOK_MESSENGER_PM
    - INSTAGRAM_DIRECT_MESSAGE
  profile_ids:                          # Optional: limit to specific profiles
    - "6066623"
  fields:                               # Optional: field whitelist
    - guid
    - post_type
    - created_time
    - activity_metadata               # JSON column — add to json_columns below
  date_anchor: created_time             # or action_last_update_time
  page_limit: 100                       # Pagination limit per request

If any field contains JSON (like activity_metadata), add it to json_columns at the source level:

sources:
  sprout_messages:
    json_columns:
      - activity_metadata

Gmail (gmail)

extractor:
  email_address_var: CLIENT_GMAIL_ADDRESS
  credentials_var: CLIENT_GMAIL_CREDENTIALS
  auth_var: CLIENT_GMAIL_AUTH
  query_string: "subject:Daily Report has:attachment"  # Raw Gmail query
  # OR use structured query:
  query:
    subject: "Daily Report"
    has: attachment
    filename: "report.csv"

Google Drive (gdrive)

extractor:
  credentials_var: CLIENT_GDRIVE_CREDENTIALS
  auth_type: service_account   # or oauth2
  file_name: "daily_export.csv"
  folder_id: "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgVE2upms"

Step 4 — Choose Load and Dedup Strategy

Load strategy (source-level load_strategy)

This controls how extracted data arrives in S3 and whether it replaces or accumulates.

Strategy S3 path includes Use when
full_refresh full_refresh/{date}/ You pull a full snapshot daily (most Gladly reports)
incremental incremental/{timestamp}/ You pull all records and dedup in the warehouse (Google Sheets)
historical historical/{ts}/{date}/ Complex, avoid — negotiate for daily snapshots instead

Bronze load strategy (snowflake-level load_strategy)

This controls how data is written into the Bronze table.

Strategy Behavior Use when
batch_replace DELETE WHERE BATCH_ID + COPY INTO Default for new pipelines — preserves history by batch
truncate_insert TRUNCATE + COPY INTO Legacy pipelines only — destructive, avoid for new work

Rule: All new pipelines should use batch_replace at the Snowflake level (see ADR 011).

Deduplication strategy

The deduplication block controls how RECORD_KEY is generated for each row.

# Single unique column (simple ID)
deduplication:
  strategy: single_field
  unique_columns:
    - CONTACT_ID
  use_hash: false   # Use raw value as RECORD_KEY

# Single unique column, hashed (e.g., long strings)
deduplication:
  strategy: single_field
  unique_columns:
    - CONTACT_ID
  use_hash: true    # MD5(CONTACT_ID) as RECORD_KEY

# Multiple columns define uniqueness
deduplication:
  strategy: composite_key
  unique_columns:
    - CONTACT_ID
    - EVENT_TYPE
    - TIMESTAMP_FIELD
  use_hash: true    # MD5(CONTACT_ID||EVENT_TYPE||TIMESTAMP_FIELD)

Step 5 — Configure the Time Window

The time window controls which date range to extract.

# Daily — extract yesterday (most common)
time_window:
  refresh_type: daily
  lag:
    days: 1
  timezone: "America/New_York"

# Daily — extract current day (no lag)
time_window:
  refresh_type: daily
  timezone: "America/New_York"

# Intraday — rolling 8-hour window, 30-minute lag
time_window:
  refresh_type: intraday
  lag:
    minutes: 30
  lookback: "8 hours"
  timezone: "America/New_York"

LagConfig fields: days, hours, minutes — all optional, default 0.

For intraday pipelines with multiple refreshes (e.g., morning + evening), use refresh_pattern: intraday with per-refresh refreshes list. Each refresh generates a separate DAG. See packages/xo-foundry/configs/examples/template-sprout-intraday.yaml.


Step 6 — Enable dbt (if models exist)

If dbt Silver/Gold models consume this Bronze table, enable dbt in globals:

globals:
  dbt:
    enabled: true
    project_dir: /usr/local/airflow/dbt/xo_medallion
    target: dev
    connection_id: snowflake_dbt
    select: "tag:gladly+"    # Cosmos dbt selector (defaults to tag:{domain})
    # exclude: "model_name"  # Optional: skip specific models

The select field uses dbt's selector syntax. tag:gladly+ runs all models tagged gladly and their downstream dependents.


Step 7 — Validate and Generate

Validate the config

uv run xo-foundry validate-config \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml

Expected output:

✓ Config valid
  DAG ID:       warbyparker_gladly_daily
  Domain:       warbyparker
  Pipeline:     gladly_daily
  Sources:      1
  Pipeline type: snowflake_load

Fix any validation errors before proceeding.

Generate the DAG

uv run xo-foundry generate-dag \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
  --output apps/airflow/xo-pipelines/dags/

This writes apps/airflow/xo-pipelines/dags/warbyparker_gladly_daily.py.

Never hand-edit the generated DAG Python file. Re-run the generator after changing the YAML config.


Step 8 — Test Locally

# Start local Airflow
cd apps/airflow/xo-pipelines
astro dev start

# Verify the DAG appears in the Airflow UI (http://localhost:8080)
# Trigger a manual run with a test date
# Check task logs for errors

If the extractor is new, test it in isolation first:

uv run xo-foundry test-extractor \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
  --source contact_timestamps \
  --date 2025-01-15

Step 9 — Deploy

cd apps/airflow/xo-pipelines
astro deploy <deployment-id>

After deploy, verify in the Astronomer UI that the DAG appears, is unpaused, and runs successfully on its next scheduled execution.


Step 10 — Update Inventory

After successful production runs, update the Snowflake object inventory:


Multi-Source Pipelines

To extract multiple reports in a single DAG, add multiple entries under sources:

sources:
  contact_timestamps:
    source_type: gladly_api
    # ...

  work_sessions:
    source_type: gladly_api
    extractor:
      metric_set: WorkSessionReport
      # same credentials vars
    paths:
      report_name: work_sessions
      filename_pattern: "work_sessions_{date}.csv"
    snowflake:
      target_table: GLADLY_WORK_SESSIONS
      load_strategy: batch_replace
      deduplication:
        strategy: single_field
        unique_columns: [SESSION_ID]
        use_hash: true

Each source runs as a parallel task group. All sources in a DAG share the globals (same Snowflake database, S3 prefixes, dbt config).


Verification Checklist

Before opening a PR:

  • Config validates: uv run xo-foundry validate-config --config <path>
  • DAG generates without errors: uv run xo-foundry generate-dag --config <path> --output dags/
  • DAG appears in local Airflow UI
  • Manual trigger completes with expected row counts in Bronze table
  • Bronze DDL migration already deployed (table exists in target DB)
  • Airflow Variables provisioned for all *_var credential fields
  • Config file committed to apps/airflow/xo-pipelines/dags/configs/
  • Generated DAG file committed to apps/airflow/xo-pipelines/dags/
  • Snowflake object inventory updated

Troubleshooting

validate-config fails with Pydantic validation error Read the error message — it identifies the exact field and constraint. Common mistakes: - start_date must be "YYYY-MM-DD" format (quoted string) - schedule must be a valid cron expression - connection_id under globals.snowflake is required (often forgotten) - source_type in the extractor block must match the parent source's source_type

DAG does not appear in Airflow UI The generated DAG file may have a syntax error. Check Airflow's Import Errors page. Re-run generate-dag and look for template rendering errors in the output.

Extractor returns zero records - Verify the Airflow Variable names match exactly (case-sensitive) - Check the time window: if lag.days: 1 and you're testing on a Monday, it pulls Sunday's data - Run test-extractor in isolation and inspect the output

Insufficient privileges in Snowflake Stop immediately. Do not retry or work around. Report the exact error to the engineering team — permissions are managed centrally. See snowflake-safety.md.

RECORD_KEY collisions (duplicate rows in Bronze) Your deduplication strategy is too narrow. Add more columns to unique_columns or switch from single_field to composite_key. Check the source data to understand what combination of fields is actually unique.