Skip to content

xo-foundry Package

xo-foundry v0.10.0 -- The orchestration layer for the XO-Data platform. Provides a DAG Factory for generating Airflow DAGs from YAML configuration, a modular task library, time window management, and CLI tools for pipeline operations.

Installation

# Install xo-foundry package
uv sync --package xo-foundry

# Add as dependency to another package
uv add --package xo-pipelines xo-foundry

Package Structure

packages/xo-foundry/src/xo_foundry/
├── cli/                        # CLI tools
│   ├── generate_dags.py       # xo-foundry generate-dag
│   └── test_extractor.py      # xo-foundry test-extractor
├── dag_factory/                # YAML → Python DAG generation
│   ├── factory.py             # DAGFactory class (Jinja2 rendering)
│   ├── builders/
│   │   └── path_builder.py    # S3 path construction
│   └── templates/             # Jinja2 DAG templates
├── schemas/                    # Pydantic configuration models
│   ├── dag_config.py          # DAGConfig, SourceConfig, TimeWindowConfig
│   └── pipeline_config.py     # Legacy pipeline config
├── tasks/                      # Airflow task library
│   ├── extract_tasks.py       # Gladly, Sprout, GSheet extraction
│   ├── stage_tasks.py         # S3 staging/transformation
│   ├── snowflake_tasks.py     # COPY INTO, dedup, metadata
│   ├── merge_tasks.py         # Merge operations (legacy_elt)
│   └── task_utils.py          # Shared task utilities
├── time_window/                # Time window management
│   ├── calculator.py          # Central time window logic
│   └── duration_parser.py     # Parse duration strings
├── s3_utils.py                 # Copy-then-Peek pattern
└── __init__.py

DAG Factory

The DAG Factory generates production-ready Airflow DAGs from YAML configuration files. It uses Pydantic for validation and Jinja2 for template rendering.

How It Works

YAML Config → Pydantic Validation → Jinja2 Template → Python DAG File
  1. Validate: YAML is parsed and validated against the DAGConfig Pydantic model
  2. Build paths: S3 paths are constructed using path_builder.py based on domain, report, and load strategy
  3. Render: Jinja2 templates generate complete Python DAG files with TaskFlow API decorators
  4. Output: Production-ready .py files placed in the DAGs directory

CLI Commands

# Generate DAG from YAML config
uv run xo-foundry generate-dag \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
  --output apps/airflow/xo-pipelines/dags/

# Validate config without generating
uv run xo-foundry validate-config \
  --config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml

Pipeline Types

Type Description Use Case
snowflake_load Extract → Stage → COPY INTO Bronze Standard daily/intraday pipelines
intraday_refresh Multiple refreshes per day with overlapping windows Near-real-time data
legacy_elt Extract → Merge into existing tables OPERATIONS database migrations

YAML Configuration Example

Based on the production warbyparker-gladly-daily.yaml:

dag:
  domain: warbyparker
  pipeline_name: gladly_daily
  pipeline_type: snowflake_load
  schedule: "50 6 * * *"  # 6:50 AM EST daily
  time_window:
    refresh_type: daily
    lag:
      days: 1           # Extract yesterday's data
    timezone: "America/New_York"

globals:
  snowflake:
    database: WBP_DB_DEV
    schema: BRONZE
    connection_id: snowflake_dev
    warehouse: XO_DEV_WH

sources:
  contact_timestamps:
    source_type: gladly_api
    load_strategy: full_refresh
    extractor:
      metric_set: ContactTimestampsReport
      base_url_var: WARBYPARKER_GLADLY_BASE_URL
      email_var: WARBYPARKER_GLADLY_EMAIL
      token_var: WARBYPARKER_GLADLY_API_TOKEN
    paths:
      report_name: contact_timestamps
      filename_pattern: "contact_timestamps_{date}.csv"
    snowflake:
      target_table: GLADLY_CONTACT_TIMESTAMPS
      load_strategy: truncate_insert
      deduplication:
        strategy: composite_key
        unique_columns:
          - CONTACT_ID
          - TIMESTAMP_FIELD
          - EVENT_TYPE
        use_hash: true

Production configs location

Production YAML configs live in apps/airflow/xo-pipelines/dags/configs/ (source of truth). The packages/xo-foundry/configs/ directory is for examples and templates only.

Pydantic Schema Reference

The configuration schema is defined in packages/xo-foundry/src/xo_foundry/schemas/dag_config.py.

Key Models

Model Purpose
DAGConfig Root config: dag + globals + sources
DAGMetadata Domain, pipeline_name, pipeline_type, schedule, time_window
TimeWindowConfig Daily/intraday modes, lag, lookback, timezone
LagConfig Structured lag: {days: 1}, {hours: 8}, {minutes: 30}
GlobalConfig Shared Snowflake settings (database, schema, warehouse)
SourceConfig Per-source: source_type, load_strategy, extractor, paths, snowflake
ExtractorConfig Polymorphic: Gladly, Sprout, GSheet, S3, Gmail extractors
DeduplicationConfig Strategy, unique_columns, use_hash

TimeWindowConfig

# Daily (extract yesterday)
time_window:
  refresh_type: daily
  lag: { days: 1 }
  timezone: "America/New_York"

# Daily (no lag, current day)
time_window:
  refresh_type: daily
  timezone: "America/New_York"

# Intraday (relative, 8hr lookback)
time_window:
  refresh_type: intraday
  lag: { minutes: 30 }
  lookback: "8 hours"
  timezone: "America/New_York"

# Intraday (absolute times)
time_window:
  refresh_type: intraday
  start_time: "2026-01-15T17:00:00"
  end_time: "2026-01-16T09:00:00"
  timezone: "America/New_York"

LagConfig structure

The lag field uses a structured LagConfig model (dict with days/hours/minutes), not string durations. Omitting lag means zero lag (current day).

Task Library

Extract Tasks (extract_tasks.py)

Extraction tasks for various data sources. All extraction uses native Python csv.DictWriter -- never pandas.

from xo_foundry.tasks.extract_tasks import extract_gladly_data

# In a DAG
@dag(schedule="50 6 * * *", catchup=False)
def warbyparker_gladly_daily_dag():
    ingest_path = extract_gladly_data(
        metric_set="ContactTimestampsReport",
        domain="warbyparker",
        report_name="contact_timestamps",
        **context
    )

Supported extractors: Gladly API, Sprout Social API, Google Sheets, Gmail, S3

Stage Tasks (stage_tasks.py)

S3-to-S3 staging with column standardization via Copy-then-Peek.

from xo_foundry.tasks.stage_tasks import copy_and_standardize

stage_info = copy_and_standardize(
    ingest_path=ingest_path,
    domain="warbyparker",
    report_name="contact_timestamps",
    load_strategy="full_refresh"
)
# Returns: {"path": "s3://...", "headers": [...]}

Snowflake Tasks (snowflake_tasks.py)

COPY INTO Bronze with TRUNCATE + FORCE pattern, metadata column injection, and deduplication.

from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake

table_name = copy_to_snowflake(
    stage_path=stage_info["path"],
    headers=stage_info["headers"],
    database="WBP_DB_DEV",
    schema="BRONZE",
    table="GLADLY_CONTACT_TIMESTAMPS"
)

Merge Tasks (merge_tasks.py)

For legacy_elt pipeline type -- MERGE operations into existing tables.

Task Utils (task_utils.py)

Shared utilities including task-level skip logic (ADR 007).

Load Strategies

Per ADR 001, three load strategies are supported:

Strategy Description S3 Path Segment Use Case
full_refresh Immutable daily snapshots full_refresh/{date}/ Most common -- Gladly reports
incremental Full pulls with warehouse dedup incremental/{timestamp}/ Google Sheets
historical Late-arriving data, SCD Type 2 historical/{date}/ Avoid when possible

Time Window

Centralized time window calculation in time_window/calculator.py. Supports multiple modes:

Mode Description Output Format
Daily Single date based on execution date minus lag YYYY-MM-DD
Intraday Relative Window from now minus lookback to now minus lag ISO8601 timestamps
Intraday Absolute Fixed start/end times ISO8601 timestamps

Copy-then-Peek Pattern

Located in s3_utils.py. Performance optimization for S3 → Snowflake pipelines:

  1. Copy file within S3 (S3-to-S3, fast -- no download)
  2. Range request for first 8KB to extract headers
  3. Generate column mappings for COPY INTO
from xo_foundry.s3_utils import copy_and_peek_s3_file

headers = copy_and_peek_s3_file(
    source_bucket="xo-ingest",
    source_key="warbyparker/contact_timestamps/2026-01-15/data.csv",
    dest_bucket="xo-stage",
    dest_key="warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv",
    peek_bytes=8192
)
# Returns: ['CONTACT_ID', 'TIMESTAMP_FIELD', 'EVENT_TYPE', ...]

Performance: Constant time (~0.5s) regardless of file size.

Critical Rules

No Pandas in Extraction

Pandas corrupts data via automatic type inference (IDs become floats, leading zeros stripped, dates become Timestamps). All extraction tasks use native Python csv.DictWriter. Let Snowflake handle type conversion.

Layer Pandas Reason
xo-foundry (extraction) Never Preserve source data exactly
xo-core (transformation) Only with explicit dtypes Controlled transforms
Snowflake (loading) Let Snowflake handle types Warehouse is schema of truth

JSON Column Handling

When a source has JSON columns (e.g., Sprout API's activity_metadata), specify json_columns in YAML config to enable JSON normalization in RECORD_HASH calculations:

sources:
  sprout_messages:
    json_columns:
      - activity_metadata

This prevents phantom updates from key ordering differences in JSON values.

Task-Level Skip Logic (ADR 007)

Runtime toggles are handled inside task functions via **context params, not by conditional task creation:

@task
def extract_data(**context):
    if context["params"].get("skip_extract"):
        logger.info("Skipping extraction per runtime param")
        return None
    # ... normal extraction logic

Dependencies

xo-foundry
├── xo-core              # Foundation utilities
├── apache-airflow>=3.0  # Orchestration
├── pydantic             # Configuration validation
├── jinja2               # Template rendering
├── click                # CLI framework
├── pyyaml               # YAML parsing
└── boto3                # AWS S3 operations

Quick Example

A complete pipeline using xo-foundry tasks:

from airflow.decorators import dag, task
from xo_foundry.tasks.extract_tasks import extract_gladly_data
from xo_foundry.tasks.stage_tasks import copy_and_standardize
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
from datetime import datetime

@dag(
    schedule="50 6 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False
)
def warbyparker_gladly_daily_dag():
    """Extract Gladly reports → S3 → Snowflake BRONZE"""

    # Extract from Gladly API (uses csv.DictWriter, NOT pandas)
    ingest = extract_gladly_data(
        metric_set="ContactTimestampsReport",
        domain="warbyparker",
        report_name="contact_timestamps"
    )

    # Stage: S3 Ingest → S3 Stage (Copy-then-Peek)
    stage = copy_and_standardize(ingest)

    # Load: TRUNCATE + COPY INTO Bronze
    load = copy_to_snowflake(stage)

# Register DAG
warbyparker_gladly_daily_dag()

Design Philosophy

Principle Description
Modularity Tasks are composable and reusable across pipelines
Configuration over Code Pipelines defined through YAML, not Python
Fail-Fast Tasks validate inputs and fail early with clear errors
Observability All tasks log exact values (full ISO8601 timestamps, API filters)
Idempotency Every run produces the same result (TRUNCATE + FORCE)

Testing

# Run xo-foundry tests
uv run pytest packages/xo-foundry/tests/

# Type checking
uv run ty check --project packages/xo-foundry

Package Location: packages/xo-foundry/ Version: 0.10.0 Dependencies: See packages/xo-foundry/pyproject.toml