xo-foundry Package¶
xo-foundry v0.10.0 -- The orchestration layer for the XO-Data platform. Provides a DAG Factory for generating Airflow DAGs from YAML configuration, a modular task library, time window management, and CLI tools for pipeline operations.
Installation¶
# Install xo-foundry package
uv sync --package xo-foundry
# Add as dependency to another package
uv add --package xo-pipelines xo-foundry
Package Structure¶
packages/xo-foundry/src/xo_foundry/
├── cli/ # CLI tools
│ ├── generate_dags.py # xo-foundry generate-dag
│ └── test_extractor.py # xo-foundry test-extractor
│
├── dag_factory/ # YAML → Python DAG generation
│ ├── factory.py # DAGFactory class (Jinja2 rendering)
│ ├── builders/
│ │ └── path_builder.py # S3 path construction
│ └── templates/ # Jinja2 DAG templates
│
├── schemas/ # Pydantic configuration models
│ ├── dag_config.py # DAGConfig, SourceConfig, TimeWindowConfig
│ └── pipeline_config.py # Legacy pipeline config
│
├── tasks/ # Airflow task library
│ ├── extract_tasks.py # Gladly, Sprout, GSheet extraction
│ ├── stage_tasks.py # S3 staging/transformation
│ ├── snowflake_tasks.py # COPY INTO, dedup, metadata
│ ├── merge_tasks.py # Merge operations (legacy_elt)
│ └── task_utils.py # Shared task utilities
│
├── time_window/ # Time window management
│ ├── calculator.py # Central time window logic
│ └── duration_parser.py # Parse duration strings
│
├── s3_utils.py # Copy-then-Peek pattern
└── __init__.py
DAG Factory¶
The DAG Factory generates production-ready Airflow DAGs from YAML configuration files. It uses Pydantic for validation and Jinja2 for template rendering.
How It Works¶
- Validate: YAML is parsed and validated against the
DAGConfigPydantic model - Build paths: S3 paths are constructed using
path_builder.pybased on domain, report, and load strategy - Render: Jinja2 templates generate complete Python DAG files with TaskFlow API decorators
- Output: Production-ready
.pyfiles placed in the DAGs directory
CLI Commands¶
# Generate DAG from YAML config
uv run xo-foundry generate-dag \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml \
--output apps/airflow/xo-pipelines/dags/
# Validate config without generating
uv run xo-foundry validate-config \
--config apps/airflow/xo-pipelines/dags/configs/warbyparker-gladly-daily.yaml
Pipeline Types¶
| Type | Description | Use Case |
|---|---|---|
snowflake_load |
Extract → Stage → COPY INTO Bronze | Standard daily/intraday pipelines |
intraday_refresh |
Multiple refreshes per day with overlapping windows | Near-real-time data |
legacy_elt |
Extract → Merge into existing tables | OPERATIONS database migrations |
YAML Configuration Example¶
Based on the production warbyparker-gladly-daily.yaml:
dag:
domain: warbyparker
pipeline_name: gladly_daily
pipeline_type: snowflake_load
schedule: "50 6 * * *" # 6:50 AM EST daily
time_window:
refresh_type: daily
lag:
days: 1 # Extract yesterday's data
timezone: "America/New_York"
globals:
snowflake:
database: WBP_DB_DEV
schema: BRONZE
connection_id: snowflake_dev
warehouse: XO_DEV_WH
sources:
contact_timestamps:
source_type: gladly_api
load_strategy: full_refresh
extractor:
metric_set: ContactTimestampsReport
base_url_var: WARBYPARKER_GLADLY_BASE_URL
email_var: WARBYPARKER_GLADLY_EMAIL
token_var: WARBYPARKER_GLADLY_API_TOKEN
paths:
report_name: contact_timestamps
filename_pattern: "contact_timestamps_{date}.csv"
snowflake:
target_table: GLADLY_CONTACT_TIMESTAMPS
load_strategy: truncate_insert
deduplication:
strategy: composite_key
unique_columns:
- CONTACT_ID
- TIMESTAMP_FIELD
- EVENT_TYPE
use_hash: true
Production configs location
Production YAML configs live in apps/airflow/xo-pipelines/dags/configs/ (source of truth). The packages/xo-foundry/configs/ directory is for examples and templates only.
Pydantic Schema Reference¶
The configuration schema is defined in packages/xo-foundry/src/xo_foundry/schemas/dag_config.py.
Key Models¶
| Model | Purpose |
|---|---|
DAGConfig |
Root config: dag + globals + sources |
DAGMetadata |
Domain, pipeline_name, pipeline_type, schedule, time_window |
TimeWindowConfig |
Daily/intraday modes, lag, lookback, timezone |
LagConfig |
Structured lag: {days: 1}, {hours: 8}, {minutes: 30} |
GlobalConfig |
Shared Snowflake settings (database, schema, warehouse) |
SourceConfig |
Per-source: source_type, load_strategy, extractor, paths, snowflake |
ExtractorConfig |
Polymorphic: Gladly, Sprout, GSheet, S3, Gmail extractors |
DeduplicationConfig |
Strategy, unique_columns, use_hash |
TimeWindowConfig¶
# Daily (extract yesterday)
time_window:
refresh_type: daily
lag: { days: 1 }
timezone: "America/New_York"
# Daily (no lag, current day)
time_window:
refresh_type: daily
timezone: "America/New_York"
# Intraday (relative, 8hr lookback)
time_window:
refresh_type: intraday
lag: { minutes: 30 }
lookback: "8 hours"
timezone: "America/New_York"
# Intraday (absolute times)
time_window:
refresh_type: intraday
start_time: "2026-01-15T17:00:00"
end_time: "2026-01-16T09:00:00"
timezone: "America/New_York"
LagConfig structure
The lag field uses a structured LagConfig model (dict with days/hours/minutes), not string durations. Omitting lag means zero lag (current day).
Task Library¶
Extract Tasks (extract_tasks.py)¶
Extraction tasks for various data sources. All extraction uses native Python csv.DictWriter -- never pandas.
from xo_foundry.tasks.extract_tasks import extract_gladly_data
# In a DAG
@dag(schedule="50 6 * * *", catchup=False)
def warbyparker_gladly_daily_dag():
ingest_path = extract_gladly_data(
metric_set="ContactTimestampsReport",
domain="warbyparker",
report_name="contact_timestamps",
**context
)
Supported extractors: Gladly API, Sprout Social API, Google Sheets, Gmail, S3
Stage Tasks (stage_tasks.py)¶
S3-to-S3 staging with column standardization via Copy-then-Peek.
from xo_foundry.tasks.stage_tasks import copy_and_standardize
stage_info = copy_and_standardize(
ingest_path=ingest_path,
domain="warbyparker",
report_name="contact_timestamps",
load_strategy="full_refresh"
)
# Returns: {"path": "s3://...", "headers": [...]}
Snowflake Tasks (snowflake_tasks.py)¶
COPY INTO Bronze with TRUNCATE + FORCE pattern, metadata column injection, and deduplication.
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
table_name = copy_to_snowflake(
stage_path=stage_info["path"],
headers=stage_info["headers"],
database="WBP_DB_DEV",
schema="BRONZE",
table="GLADLY_CONTACT_TIMESTAMPS"
)
Merge Tasks (merge_tasks.py)¶
For legacy_elt pipeline type -- MERGE operations into existing tables.
Task Utils (task_utils.py)¶
Shared utilities including task-level skip logic (ADR 007).
Load Strategies¶
Per ADR 001, three load strategies are supported:
| Strategy | Description | S3 Path Segment | Use Case |
|---|---|---|---|
full_refresh |
Immutable daily snapshots | full_refresh/{date}/ |
Most common -- Gladly reports |
incremental |
Full pulls with warehouse dedup | incremental/{timestamp}/ |
Google Sheets |
historical |
Late-arriving data, SCD Type 2 | historical/{date}/ |
Avoid when possible |
Time Window¶
Centralized time window calculation in time_window/calculator.py. Supports multiple modes:
| Mode | Description | Output Format |
|---|---|---|
| Daily | Single date based on execution date minus lag | YYYY-MM-DD |
| Intraday Relative | Window from now minus lookback to now minus lag | ISO8601 timestamps |
| Intraday Absolute | Fixed start/end times | ISO8601 timestamps |
Copy-then-Peek Pattern¶
Located in s3_utils.py. Performance optimization for S3 → Snowflake pipelines:
- Copy file within S3 (S3-to-S3, fast -- no download)
- Range request for first 8KB to extract headers
- Generate column mappings for COPY INTO
from xo_foundry.s3_utils import copy_and_peek_s3_file
headers = copy_and_peek_s3_file(
source_bucket="xo-ingest",
source_key="warbyparker/contact_timestamps/2026-01-15/data.csv",
dest_bucket="xo-stage",
dest_key="warbyparker/contact_timestamps/full_refresh/2026-01-15/data.csv",
peek_bytes=8192
)
# Returns: ['CONTACT_ID', 'TIMESTAMP_FIELD', 'EVENT_TYPE', ...]
Performance: Constant time (~0.5s) regardless of file size.
Critical Rules¶
No Pandas in Extraction¶
Pandas corrupts data via automatic type inference (IDs become floats, leading zeros stripped, dates become Timestamps). All extraction tasks use native Python csv.DictWriter. Let Snowflake handle type conversion.
| Layer | Pandas | Reason |
|---|---|---|
| xo-foundry (extraction) | Never | Preserve source data exactly |
| xo-core (transformation) | Only with explicit dtypes | Controlled transforms |
| Snowflake (loading) | Let Snowflake handle types | Warehouse is schema of truth |
JSON Column Handling¶
When a source has JSON columns (e.g., Sprout API's activity_metadata), specify json_columns in YAML config to enable JSON normalization in RECORD_HASH calculations:
This prevents phantom updates from key ordering differences in JSON values.
Task-Level Skip Logic (ADR 007)¶
Runtime toggles are handled inside task functions via **context params, not by conditional task creation:
@task
def extract_data(**context):
if context["params"].get("skip_extract"):
logger.info("Skipping extraction per runtime param")
return None
# ... normal extraction logic
Dependencies¶
xo-foundry
├── xo-core # Foundation utilities
├── apache-airflow>=3.0 # Orchestration
├── pydantic # Configuration validation
├── jinja2 # Template rendering
├── click # CLI framework
├── pyyaml # YAML parsing
└── boto3 # AWS S3 operations
Quick Example¶
A complete pipeline using xo-foundry tasks:
from airflow.decorators import dag, task
from xo_foundry.tasks.extract_tasks import extract_gladly_data
from xo_foundry.tasks.stage_tasks import copy_and_standardize
from xo_foundry.tasks.snowflake_tasks import copy_to_snowflake
from datetime import datetime
@dag(
schedule="50 6 * * *",
start_date=datetime(2026, 1, 1),
catchup=False
)
def warbyparker_gladly_daily_dag():
"""Extract Gladly reports → S3 → Snowflake BRONZE"""
# Extract from Gladly API (uses csv.DictWriter, NOT pandas)
ingest = extract_gladly_data(
metric_set="ContactTimestampsReport",
domain="warbyparker",
report_name="contact_timestamps"
)
# Stage: S3 Ingest → S3 Stage (Copy-then-Peek)
stage = copy_and_standardize(ingest)
# Load: TRUNCATE + COPY INTO Bronze
load = copy_to_snowflake(stage)
# Register DAG
warbyparker_gladly_daily_dag()
Design Philosophy¶
| Principle | Description |
|---|---|
| Modularity | Tasks are composable and reusable across pipelines |
| Configuration over Code | Pipelines defined through YAML, not Python |
| Fail-Fast | Tasks validate inputs and fail early with clear errors |
| Observability | All tasks log exact values (full ISO8601 timestamps, API filters) |
| Idempotency | Every run produces the same result (TRUNCATE + FORCE) |
Testing¶
# Run xo-foundry tests
uv run pytest packages/xo-foundry/tests/
# Type checking
uv run ty check --project packages/xo-foundry
Related Documentation¶
- DAG Factory Guide -- Detailed DAG Factory walkthrough
- ELT Pipeline Flow -- Pipeline architecture
- ELT Layer Architecture -- Layer responsibilities
- xo-core Package -- Foundation utilities
- Architecture Decisions -- ADRs shaping design
Package Location: packages/xo-foundry/
Version: 0.10.0
Dependencies: See packages/xo-foundry/pyproject.toml