Playbook: Adding a New Data Source Extractor¶
Purpose: Step-by-step guide to add a new API or file-based extractor to the platform. Follow in order — each step builds on the previous.
Overview¶
Adding a new extractor involves three layers: 1. xo-core — the extractor class (API client + data fetching logic) 2. xo-foundry — the Airflow task wrapper 3. DAG Factory — YAML config + schema update (if needed)
Phase 1 — xo-core: Extractor Class¶
Step 1.1 — Create the extractor file¶
Location: packages/xo-core/src/xo_core/extractors/{source_name}.py
from __future__ import annotations
import csv
import io
import logging
from typing import Any
import requests
from xo_core.extractors.base import BaseExtractor
logger = logging.getLogger(__name__)
class MyApiExtractor(BaseExtractor):
"""Extracts records from MyAPI.
Returns records as list[dict[str, str]] — always strings, no type inference.
"""
BASE_URL = "https://api.myservice.com/v1"
def __init__(self, api_key: str, org: str) -> None:
self.api_key = api_key
self.org = org
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
def extract(
self,
start_date: str,
end_date: str,
endpoint: str = "records",
) -> list[dict[str, str]]:
"""Extract records for the given date window.
Args:
start_date: ISO8601 start (inclusive)
end_date: ISO8601 end (exclusive)
endpoint: API endpoint to hit
Returns:
List of records as string dicts (never parsed types)
"""
logger.info(f"Fetching {endpoint}: start={start_date} end={end_date}")
url = f"{self.BASE_URL}/{self.org}/{endpoint}"
params = {"from": start_date, "to": end_date}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
records = data.get("items", [])
# Convert all values to strings — never leave as native Python types
return [{str(k): str(v) if v is not None else "" for k, v in r.items()} for r in records]
Critical rules for the extractor class:
- Return list[dict[str, str]] — strings only, no type inference
- Log exact parameter values (not summaries)
- Use raise_for_status() — never swallow HTTP errors
- Never import pandas
- Never do type conversion (dates, ints, booleans) — leave as strings
Step 1.2 — Export from the package¶
Add to packages/xo-core/src/xo_core/extractors/__init__.py:
Step 1.3 — Type check and lint¶
uv sync --package xo-core
uv run ty check packages/xo-core
uv run ruff check packages/xo-core
uv run ruff format packages/xo-core
All three must pass with zero errors.
Phase 2 — xo-foundry: Airflow Task Wrapper¶
Step 2.1 — Add the task function¶
Add to packages/xo-foundry/src/xo_foundry/tasks/extract_tasks.py:
@task
def extract_myapi_records(
start_date: str,
end_date: str,
endpoint: str = "records",
**context: Any,
) -> str:
"""Extract records from MyAPI and write to S3 ingest path.
Returns the S3 ingest path for the staged file.
"""
from xo_core.extractors.my_api import MyApiExtractor
from xo_foundry.s3_utils import write_csv_to_s3
api_key = Variable.get("MYAPI_KEY")
org = Variable.get("MYAPI_ORG")
extractor = MyApiExtractor(api_key=api_key, org=org)
records = extractor.extract(start_date=start_date, end_date=end_date, endpoint=endpoint)
logger.info(f"Extracted {len(records)} records from {endpoint}")
# Write to S3 using csv.DictWriter — never pandas
s3_path = write_csv_to_s3(
records=records,
bucket=Variable.get("INGEST_BUCKET"),
key=f"myservice/{endpoint}/full_refresh/{start_date}/data.csv",
)
logger.info(f"Written to S3: {s3_path}")
return s3_path
Critical rules for the task wrapper:
- Use csv.DictWriter via write_csv_to_s3 — never pandas
- Pull credentials from Airflow Variables, not hardcoded
- Return the S3 path string for downstream tasks
- Log exact counts and paths
Step 2.2 — Type check xo-foundry¶
uv sync --package xo-foundry
uv run ty check packages/xo-foundry
uv run ruff check packages/xo-foundry
Phase 3 — DAG Factory: Schema + Config¶
Step 3.1 — Add YAML schema fields (if new config keys needed)¶
If the new extractor needs new YAML keys, add Pydantic fields to:
packages/xo-foundry/src/xo_foundry/schemas/dag_config.py
class MyApiSourceConfig(BaseModel):
extractor: Literal["myapi_records"]
endpoint: str = "records"
load_strategy: LoadStrategy
snowflake_table: str
snowflake_database: str
snowflake_schema: str = "BRONZE"
Step 3.2 — Create a template YAML config¶
Create: packages/xo-foundry/configs/myapi-pipeline-template.yaml
# Template config for MyAPI pipelines
# Copy to apps/airflow/xo-pipelines/dags/configs/{client}-myapi-daily.yaml for production
dag:
dag_id: {client}_myapi_daily
schedule: "0 5 * * 1-6"
catchup: false
tags: ["{client}", "myapi", "daily"]
time_window:
refresh_type: daily
timezone: America/New_York
daily_lag_minutes: 1440
sources:
records:
extractor: myapi_records
endpoint: records
load_strategy: full_refresh
snowflake_table: MYAPI_RECORDS
snowflake_database: "{CLIENT}_DB"
snowflake_schema: BRONZE
Step 3.3 — Validate the template¶
uv run xo-foundry validate-config \
--config packages/xo-foundry/configs/myapi-pipeline-template.yaml
Phase 4 — Integration Test¶
Step 4.1 — Test the extractor in isolation¶
uv run xo-foundry test-extractor \
--extractor myapi_records \
--start-date 2026-02-01 \
--end-date 2026-02-02
Verify: - Records returned (check count) - All values are strings - No type inference artifacts (IDs not floated, dates not parsed)
Step 4.2 — Test the full pipeline locally¶
cd apps/airflow/xo-pipelines
astro dev start
# Trigger the new DAG in Airflow UI
# Check Bronze table in Snowflake DEV
astro dev stop
Verify:
- RECORD_KEY and RECORD_HASH populated correctly
- DATE_TO_WAREHOUSE is set
- Row counts match expected output
- No NULL columns that should have values
Phase 5 — Documentation & Deploy¶
Step 5.1 — Update the extractor testing guide¶
If you discovered anything non-obvious during testing, add it to:
guides/extractor-testing-workflow.md
Step 5.2 — Deploy¶
Step 5.3 — Update the Snowflake object inventory¶
Add the new Bronze table to:
reference/snowflake-object-inventory.md
Checklist Summary¶
- Extractor class created in
xo-core/extractors/ - Returns
list[dict[str, str]]— strings only - No pandas imports in xo-foundry
-
ty check packages/xo-corepasses - Task wrapper created in
xo-foundry/tasks/extract_tasks.py - Credentials from Airflow Variables (not hardcoded)
-
ty check packages/xo-foundrypasses - YAML template created in
xo-foundry/configs/ -
validate-configpasses on template - Integration test in local Airflow passes
- Bronze table schema added to Snowflake object inventory
- Deployed to Astronomer
See also: extractor-testing-workflow.md