Skip to content

Playbook: Adding a New Data Source Extractor

Purpose: Step-by-step guide to add a new API or file-based extractor to the platform. Follow in order — each step builds on the previous.


Overview

Adding a new extractor involves three layers: 1. xo-core — the extractor class (API client + data fetching logic) 2. xo-foundry — the Airflow task wrapper 3. DAG Factory — YAML config + schema update (if needed)


Phase 1 — xo-core: Extractor Class

Step 1.1 — Create the extractor file

Location: packages/xo-core/src/xo_core/extractors/{source_name}.py

from __future__ import annotations

import csv
import io
import logging
from typing import Any

import requests

from xo_core.extractors.base import BaseExtractor

logger = logging.getLogger(__name__)


class MyApiExtractor(BaseExtractor):
    """Extracts records from MyAPI.

    Returns records as list[dict[str, str]] — always strings, no type inference.
    """

    BASE_URL = "https://api.myservice.com/v1"

    def __init__(self, api_key: str, org: str) -> None:
        self.api_key = api_key
        self.org = org
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        })

    def extract(
        self,
        start_date: str,
        end_date: str,
        endpoint: str = "records",
    ) -> list[dict[str, str]]:
        """Extract records for the given date window.

        Args:
            start_date: ISO8601 start (inclusive)
            end_date: ISO8601 end (exclusive)
            endpoint: API endpoint to hit

        Returns:
            List of records as string dicts (never parsed types)
        """
        logger.info(f"Fetching {endpoint}: start={start_date} end={end_date}")

        url = f"{self.BASE_URL}/{self.org}/{endpoint}"
        params = {"from": start_date, "to": end_date}

        response = self.session.get(url, params=params)
        response.raise_for_status()

        data = response.json()
        records = data.get("items", [])

        # Convert all values to strings — never leave as native Python types
        return [{str(k): str(v) if v is not None else "" for k, v in r.items()} for r in records]

Critical rules for the extractor class: - Return list[dict[str, str]] — strings only, no type inference - Log exact parameter values (not summaries) - Use raise_for_status() — never swallow HTTP errors - Never import pandas - Never do type conversion (dates, ints, booleans) — leave as strings

Step 1.2 — Export from the package

Add to packages/xo-core/src/xo_core/extractors/__init__.py:

from xo_core.extractors.my_api import MyApiExtractor

__all__ = [
    ...,
    "MyApiExtractor",
]

Step 1.3 — Type check and lint

uv sync --package xo-core
uv run ty check packages/xo-core
uv run ruff check packages/xo-core
uv run ruff format packages/xo-core

All three must pass with zero errors.


Phase 2 — xo-foundry: Airflow Task Wrapper

Step 2.1 — Add the task function

Add to packages/xo-foundry/src/xo_foundry/tasks/extract_tasks.py:

@task
def extract_myapi_records(
    start_date: str,
    end_date: str,
    endpoint: str = "records",
    **context: Any,
) -> str:
    """Extract records from MyAPI and write to S3 ingest path.

    Returns the S3 ingest path for the staged file.
    """
    from xo_core.extractors.my_api import MyApiExtractor
    from xo_foundry.s3_utils import write_csv_to_s3

    api_key = Variable.get("MYAPI_KEY")
    org = Variable.get("MYAPI_ORG")

    extractor = MyApiExtractor(api_key=api_key, org=org)
    records = extractor.extract(start_date=start_date, end_date=end_date, endpoint=endpoint)

    logger.info(f"Extracted {len(records)} records from {endpoint}")

    # Write to S3 using csv.DictWriter — never pandas
    s3_path = write_csv_to_s3(
        records=records,
        bucket=Variable.get("INGEST_BUCKET"),
        key=f"myservice/{endpoint}/full_refresh/{start_date}/data.csv",
    )

    logger.info(f"Written to S3: {s3_path}")
    return s3_path

Critical rules for the task wrapper: - Use csv.DictWriter via write_csv_to_s3 — never pandas - Pull credentials from Airflow Variables, not hardcoded - Return the S3 path string for downstream tasks - Log exact counts and paths

Step 2.2 — Type check xo-foundry

uv sync --package xo-foundry
uv run ty check packages/xo-foundry
uv run ruff check packages/xo-foundry

Phase 3 — DAG Factory: Schema + Config

Step 3.1 — Add YAML schema fields (if new config keys needed)

If the new extractor needs new YAML keys, add Pydantic fields to: packages/xo-foundry/src/xo_foundry/schemas/dag_config.py

class MyApiSourceConfig(BaseModel):
    extractor: Literal["myapi_records"]
    endpoint: str = "records"
    load_strategy: LoadStrategy
    snowflake_table: str
    snowflake_database: str
    snowflake_schema: str = "BRONZE"

Step 3.2 — Create a template YAML config

Create: packages/xo-foundry/configs/myapi-pipeline-template.yaml

# Template config for MyAPI pipelines
# Copy to apps/airflow/xo-pipelines/dags/configs/{client}-myapi-daily.yaml for production

dag:
  dag_id: {client}_myapi_daily
  schedule: "0 5 * * 1-6"
  catchup: false
  tags: ["{client}", "myapi", "daily"]
  time_window:
    refresh_type: daily
    timezone: America/New_York
    daily_lag_minutes: 1440

sources:
  records:
    extractor: myapi_records
    endpoint: records
    load_strategy: full_refresh
    snowflake_table: MYAPI_RECORDS
    snowflake_database: "{CLIENT}_DB"
    snowflake_schema: BRONZE

Step 3.3 — Validate the template

uv run xo-foundry validate-config \
  --config packages/xo-foundry/configs/myapi-pipeline-template.yaml

Phase 4 — Integration Test

Step 4.1 — Test the extractor in isolation

uv run xo-foundry test-extractor \
  --extractor myapi_records \
  --start-date 2026-02-01 \
  --end-date 2026-02-02

Verify: - Records returned (check count) - All values are strings - No type inference artifacts (IDs not floated, dates not parsed)

Step 4.2 — Test the full pipeline locally

cd apps/airflow/xo-pipelines
astro dev start
# Trigger the new DAG in Airflow UI
# Check Bronze table in Snowflake DEV
astro dev stop

Verify: - RECORD_KEY and RECORD_HASH populated correctly - DATE_TO_WAREHOUSE is set - Row counts match expected output - No NULL columns that should have values


Phase 5 — Documentation & Deploy

Step 5.1 — Update the extractor testing guide

If you discovered anything non-obvious during testing, add it to: guides/extractor-testing-workflow.md

Step 5.2 — Deploy

cd apps/airflow/xo-pipelines
astro deploy <deployment-id>

Step 5.3 — Update the Snowflake object inventory

Add the new Bronze table to: reference/snowflake-object-inventory.md


Checklist Summary

  • Extractor class created in xo-core/extractors/
  • Returns list[dict[str, str]] — strings only
  • No pandas imports in xo-foundry
  • ty check packages/xo-core passes
  • Task wrapper created in xo-foundry/tasks/extract_tasks.py
  • Credentials from Airflow Variables (not hardcoded)
  • ty check packages/xo-foundry passes
  • YAML template created in xo-foundry/configs/
  • validate-config passes on template
  • Integration test in local Airflow passes
  • Bronze table schema added to Snowflake object inventory
  • Deployed to Astronomer

See also: extractor-testing-workflow.md