Playbook: Adding a New Data Source Extractor¶
Purpose: Step-by-step guide to add a new API or file-based extractor to the platform. Follow in order — each step builds on the previous.
Overview¶
Adding a new extractor involves three layers: 1. xo-core — the extractor class (API client + data fetching logic) 2. xo-foundry — the Airflow task wrapper 3. DAG Factory — YAML config + schema update (if needed)
Phase 1 — xo-core: Extractor Class¶
Step 1.1 — Create the extractor file¶
Location: packages/xo-core/src/xo_core/extractors/{source_name}.py
from __future__ import annotations
import csv
import io
import logging
from typing import Any
import requests
from xo_core.extractors.base import BaseExtractor
logger = logging.getLogger(__name__)
class MyApiExtractor(BaseExtractor):
"""Extracts records from MyAPI.
Returns records as list[dict[str, str]] — always strings, no type inference.
"""
BASE_URL = "https://api.myservice.com/v1"
def __init__(self, api_key: str, org: str) -> None:
self.api_key = api_key
self.org = org
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
def extract(
self,
start_date: str,
end_date: str,
endpoint: str = "records",
) -> list[dict[str, str]]:
"""Extract records for the given date window.
Args:
start_date: ISO8601 start (inclusive)
end_date: ISO8601 end (exclusive)
endpoint: API endpoint to hit
Returns:
List of records as string dicts (never parsed types)
"""
logger.info(f"Fetching {endpoint}: start={start_date} end={end_date}")
url = f"{self.BASE_URL}/{self.org}/{endpoint}"
params = {"from": start_date, "to": end_date}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
records = data.get("items", [])
# Convert all values to strings — never leave as native Python types
return [{str(k): str(v) if v is not None else "" for k, v in r.items()} for r in records]
Critical rules for the extractor class:
- Return list[dict[str, str]] — strings only, no type inference
- Log exact parameter values (not summaries)
- Use raise_for_status() — never swallow HTTP errors
- Never import pandas
- Never do type conversion (dates, ints, booleans) — leave as strings
Step 1.2 — Export from the package¶
Add to packages/xo-core/src/xo_core/extractors/__init__.py:
Step 1.3 — Type check and lint¶
uv sync --package xo-core
uv run ty check packages/xo-core
uv run ruff check packages/xo-core
uv run ruff format packages/xo-core
All three must pass with zero errors.
Phase 2 — xo-foundry: Airflow Task Wrapper¶
Step 2.1 — Create the extract submodule¶
Create packages/xo-foundry/src/xo_foundry/tasks/extract/{source_type}.py:
from __future__ import annotations
import logging
from typing import Any
from airflow.sdk import task
logger = logging.getLogger(__name__)
@task
def extract_myapi_records(
extractor_config: dict[str, Any],
path_config: dict[str, Any],
time_window: dict[str, str] | None = None,
**context: Any,
) -> dict[str, Any]:
"""Extract records from MyAPI and write to S3 ingest path."""
from xo_core.extractors.my_api import MyApiExtractor
from airflow.models import Variable
from xo_foundry.tasks.task_utils import is_debug_logging_enabled
api_key = Variable.get(extractor_config["api_key_var"])
extractor = MyApiExtractor(api_key=api_key)
# ... extraction logic using csv.DictWriter (never pandas)
return {"s3_path": "...", "record_count": 0}
Re-export from packages/xo-foundry/src/xo_foundry/tasks/extract/__init__.py:
from xo_foundry.tasks.extract.my_api import extract_myapi_records
__all__ = [..., "extract_myapi_records"]
The backwards-compat shim at tasks/extract_tasks.py re-exports everything from tasks/extract/,
so hand-written DAGs that import from extract_tasks continue to work without changes.
Step 2.2 — Create the Jinja2 extractor macro¶
Create packages/xo-foundry/src/xo_foundry/dag_factory/templates/extractors/{source_type}.macro.j2:
{% macro extract_imports() %}from xo_foundry.tasks.extract_tasks import extract_myapi_records{% endmacro %}
{% macro extract_function_name() %}extract_myapi_records{% endmacro %}
{% macro description(source_key, source) %}{{ source_key }} data from MyAPI{% endmacro %}
{% macro extractor_config_block(source) %} "api_key_var": "{{ source.extractor.api_key_var }}",
{% endmacro %}
{% macro intraday_extractor_config_block(source) %} "api_key_var": "{{ source.extractor.api_key_var }}",
{% endmacro %}
{% macro intraday_extract_call(indent) %}{{ indent }}result = extract_myapi_records(
{{ indent }} extractor_config=report_config["extractor_config"],
{{ indent }} path_config=report_config["path_config"],
{{ indent }} data_date=time_window["start_date_formatted"],
{{ indent }})
{% endmacro %}
The DAG factory templates dynamically import the correct macro via:
{% from 'extractors/' ~ dag_source_type ~ '.macro.j2' import ... %}
Step 2.3 — Add the task function (legacy path)¶
Add to packages/xo-foundry/src/xo_foundry/tasks/extract_tasks.py:
@task
def extract_myapi_records(
start_date: str,
end_date: str,
endpoint: str = "records",
**context: Any,
) -> str:
"""Extract records from MyAPI and write to S3 ingest path.
Returns the S3 ingest path for the staged file.
"""
from xo_core.extractors.my_api import MyApiExtractor
from xo_foundry.s3_utils import write_csv_to_s3
api_key = Variable.get("MYAPI_KEY")
org = Variable.get("MYAPI_ORG")
extractor = MyApiExtractor(api_key=api_key, org=org)
records = extractor.extract(start_date=start_date, end_date=end_date, endpoint=endpoint)
logger.info(f"Extracted {len(records)} records from {endpoint}")
# Write to S3 using csv.DictWriter — never pandas
s3_path = write_csv_to_s3(
records=records,
bucket=Variable.get("INGEST_BUCKET"),
key=f"myservice/{endpoint}/full_refresh/{start_date}/data.csv",
)
logger.info(f"Written to S3: {s3_path}")
return s3_path
Critical rules for the task wrapper:
- Use csv.DictWriter via write_csv_to_s3 — never pandas
- Pull credentials from Airflow Variables, not hardcoded
- Return the S3 path string for downstream tasks
- Log exact counts and paths
Step 2.2 — Type check xo-foundry¶
uv sync --package xo-foundry
uv run ty check packages/xo-foundry
uv run ruff check packages/xo-foundry
Phase 3 — DAG Factory: Schema + Config¶
Step 3.1 — Add YAML schema fields (if new config keys needed)¶
If the new extractor needs new YAML keys, add Pydantic fields to:
packages/xo-foundry/src/xo_foundry/schemas/dag_config.py
class MyApiSourceConfig(BaseModel):
extractor: Literal["myapi_records"]
endpoint: str = "records"
load_strategy: LoadStrategy
snowflake_table: str
snowflake_database: str
snowflake_schema: str = "BRONZE"
Step 3.2 — Create a template YAML config¶
Create: packages/xo-foundry/configs/myapi-pipeline-template.yaml
# Template config for MyAPI pipelines
# Copy to apps/airflow/xo-pipelines/dags/configs/{client}-myapi-daily.yaml for production
dag:
dag_id: {client}_myapi_daily
schedule: "0 5 * * 1-6"
catchup: false
tags: ["{client}", "myapi", "daily"]
time_window:
refresh_type: daily
timezone: America/New_York
daily_lag_minutes: 1440
sources:
records:
extractor: myapi_records
endpoint: records
load_strategy: full_refresh
snowflake_table: MYAPI_RECORDS
snowflake_database: "{CLIENT}_DB"
snowflake_schema: BRONZE
Step 3.3 — Validate the template¶
uv run xo-foundry validate-config \
--config packages/xo-foundry/configs/myapi-pipeline-template.yaml
Phase 4 — Integration Test¶
Step 4.1 — Test the extractor in isolation¶
uv run xo-foundry test-extractor \
--extractor myapi_records \
--start-date 2026-02-01 \
--end-date 2026-02-02
Verify: - Records returned (check count) - All values are strings - No type inference artifacts (IDs not floated, dates not parsed)
Step 4.2 — Test the full pipeline locally¶
cd apps/airflow/xo-pipelines
astro dev start
# Trigger the new DAG in Airflow UI
# Check Bronze table in Snowflake DEV
astro dev stop
Verify:
- RECORD_KEY and RECORD_HASH populated correctly
- DATE_TO_WAREHOUSE is set
- Row counts match expected output
- No NULL columns that should have values
Phase 5 — Documentation & Deploy¶
Step 5.1 — Update the extractor testing guide¶
If you discovered anything non-obvious during testing, add it to:
guides/extractor-testing-workflow.md
Step 5.2 — Deploy¶
Step 5.3 — Update the Snowflake object inventory¶
Add the new Bronze table to:
reference/snowflake-object-inventory.md
Checklist Summary¶
- Extractor class created in
xo-core/extractors/{source_type}.py - Returns
list[dict[str, str]]— strings only, no pandas -
ty check packages/xo-corepasses - Extract submodule created at
xo-foundry/tasks/extract/{source_type}.py - Re-exported from
xo-foundry/tasks/extract/__init__.py - Jinja2 macro created at
xo-foundry/dag_factory/templates/extractors/{source_type}.macro.j2 - All 6 macro functions implemented:
extract_imports,extract_function_name,description,extractor_config_block,intraday_extractor_config_block,intraday_extract_call - Credentials from Airflow Variables (not hardcoded)
-
ty check packages/xo-foundrypasses - YAML template created in
xo-foundry/configs/examples/ -
validate-configpasses on template -
generate-all --checkpasses (golden tests catch any template regressions) - Integration test in local Airflow passes
- Bronze table schema added to Snowflake object inventory
- Deployed to Astronomer
See also: extractor-testing-workflow.md