# Data Reconciliation & Enhancement Implementation Plan

**Date**: 2026-01-07
**Status**: Design Complete - Ready for Implementation
**Context**: HRNZ API keys unavailable; hybrid HRNZ scraper + TAB live capture required

---

## Executive Summary

This document outlines the implementation plan for a hybrid data ingestion system that combines:
1. **HRNZ web scraping** (historical data)
2. **TAB live API capture** (ongoing data)
3. **Reconciliation system** (deduplication & merging)
4. **Incremental updates** (minimize scraping/API calls)

**Goal**: Build comprehensive historical database while avoiding duplicates and minimizing resource usage.

---

## Problem Statement

### Current Situation
- ✅ TAB API integration complete (live races only, no historical data)
- ✅ HRNZ web scraper complete (can extract historical data)
- ❌ HRNZ API keys not being issued (January 2026)
- ❌ No reconciliation between TAB and HRNZ data
- ❌ No incremental update strategy
- ❌ Risk of duplicate meetings/races from both sources

### Requirements
1. Import historical data via HRNZ scraper
2. Capture ongoing live data via TAB API
3. Avoid duplicate records when both sources have same race
4. Track data source for provenance
5. Minimize unnecessary scraping/API calls
6. Support incremental updates and backfills

---

## Architecture Design

### Data Source Tracking

Add `data_source` field to track origin of each record:

```sql
-- Add to meetings table
ALTER TABLE meetings ADD COLUMN data_source VARCHAR(20) DEFAULT 'unknown';
ALTER TABLE meetings ADD COLUMN last_updated TIMESTAMP DEFAULT NOW();

-- Values: 'tab_api', 'hrnz_scraper', 'manual', 'merged'
```

### Meeting Deduplication Strategy

**Matching Criteria** (in order of preference):

1. **Exact match**: Same venue + date
2. **TAB meeting ID**: If TAB provided meeting ID
3. **HRNZ meeting ID**: If HRNZ provided meeting ID (rare)
4. **Fuzzy match**: Similar venue name + date

**Conflict Resolution**:
- Prefer TAB data (more structured, has event IDs)
- Merge additional fields from HRNZ if missing in TAB
- Track both source IDs in metadata

### Database Schema Enhancements

```python
# packages/core/storage/models.py

class Meeting(Base):
    __tablename__ = "meetings"

    id = Column(String, primary_key=True)
    meeting_date = Column(Date, nullable=False, index=True)
    venue = Column(String, nullable=False)

    # NEW: Source tracking
    data_source = Column(String(20), nullable=False, default="unknown")
    tab_meeting_id = Column(String, nullable=True)  # Original TAB ID
    hrnz_url = Column(String, nullable=True)        # HRNZ source URL
    last_updated = Column(DateTime, server_default=func.now(), onupdate=func.now())

    # Existing
    raw_json = Column(JSONB)
    races = relationship("Race", back_populates="meeting", cascade="all, delete-orphan")

class DataIngestionLog(Base):
    """Track what has been scraped/ingested to enable incremental updates."""
    __tablename__ = "data_ingestion_log"

    id = Column(Integer, primary_key=True, autoincrement=True)
    source = Column(String(20), nullable=False)  # 'tab_api' or 'hrnz_scraper'
    date_processed = Column(Date, nullable=False, index=True)
    url = Column(String, nullable=True)  # For HRNZ URLs
    status = Column(String(20), nullable=False)  # 'success', 'failed', 'partial'
    records_created = Column(Integer, default=0)
    records_updated = Column(Integer, default=0)
    errors = Column(Text, nullable=True)
    created_at = Column(DateTime, server_default=func.now())

    __table_args__ = (
        Index('idx_ingestion_source_date', 'source', 'date_processed'),
    )
```

---

## Implementation Plan

### Phase 1: Database Schema Updates (Week 1)

#### 1.1 Create Migration
```bash
docker compose run --rm worker alembic revision -m "add_data_source_tracking"
```

**Migration Content**:
```python
def upgrade():
    # Add source tracking to meetings
    op.add_column('meetings', sa.Column('data_source', sa.String(20), server_default='unknown'))
    op.add_column('meetings', sa.Column('tab_meeting_id', sa.String, nullable=True))
    op.add_column('meetings', sa.Column('hrnz_url', sa.String, nullable=True))
    op.add_column('meetings', sa.Column('last_updated', sa.DateTime, server_default=sa.func.now()))

    # Create ingestion log table
    op.create_table(
        'data_ingestion_log',
        sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
        sa.Column('source', sa.String(20), nullable=False),
        sa.Column('date_processed', sa.Date, nullable=False),
        sa.Column('url', sa.String, nullable=True),
        sa.Column('status', sa.String(20), nullable=False),
        sa.Column('records_created', sa.Integer, default=0),
        sa.Column('records_updated', sa.Integer, default=0),
        sa.Column('errors', sa.Text, nullable=True),
        sa.Column('created_at', sa.DateTime, server_default=sa.func.now()),
    )

    op.create_index('idx_ingestion_source_date', 'data_ingestion_log', ['source', 'date_processed'])

def downgrade():
    op.drop_index('idx_ingestion_source_date')
    op.drop_table('data_ingestion_log')
    op.drop_column('meetings', 'last_updated')
    op.drop_column('meetings', 'hrnz_url')
    op.drop_column('meetings', 'tab_meeting_id')
    op.drop_column('meetings', 'data_source')
```

#### 1.2 Update Models
Update `packages/core/storage/models.py` with new fields.

#### 1.3 Update Repositories
Modify `MeetingRepository.upsert()` to handle source tracking.

---

### Phase 2: Reconciliation Service (Week 1-2)

Create new module for reconciliation logic.

**File**: `packages/core/storage/reconciliation.py`

```python
"""Data reconciliation service for merging TAB and HRNZ data."""

from datetime import date
from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from difflib import SequenceMatcher

from packages.core.common.logging import get_logger
from packages.core.storage.models import Meeting
from packages.core.storage.repositories import MeetingRepository

logger = get_logger(__name__)


class ReconciliationService:
    """Reconciles data from multiple sources to avoid duplicates."""

    def __init__(self, session: Session):
        self.session = session

    def find_matching_meeting(
        self,
        venue: str,
        meeting_date: date,
        source_id: Optional[str] = None,
    ) -> Optional[Meeting]:
        """Find existing meeting that matches criteria.

        Args:
            venue: Venue name
            meeting_date: Meeting date
            source_id: Original source ID (TAB or HRNZ)

        Returns:
            Existing meeting if found, None otherwise
        """
        # Strategy 1: Exact venue + date match
        existing = (
            self.session.query(Meeting)
            .filter(
                Meeting.venue == venue,
                Meeting.meeting_date == meeting_date,
            )
            .first()
        )

        if existing:
            logger.debug(f"Found exact match: {existing.id}")
            return existing

        # Strategy 2: Check source IDs
        if source_id:
            existing = (
                self.session.query(Meeting)
                .filter(
                    (Meeting.tab_meeting_id == source_id)
                    | (Meeting.id == source_id)
                )
                .first()
            )

            if existing:
                logger.debug(f"Found source ID match: {existing.id}")
                return existing

        # Strategy 3: Fuzzy venue name match + same date
        candidates = (
            self.session.query(Meeting)
            .filter(Meeting.meeting_date == meeting_date)
            .all()
        )

        for candidate in candidates:
            similarity = self._venue_similarity(venue, candidate.venue)
            if similarity > 0.85:  # 85% similar
                logger.debug(
                    f"Found fuzzy match: {candidate.id} "
                    f"(similarity: {similarity:.2f})"
                )
                return candidate

        return None

    @staticmethod
    def _venue_similarity(venue1: str, venue2: str) -> float:
        """Calculate similarity between two venue names.

        Returns:
            Similarity score (0.0 to 1.0)
        """
        # Normalize: lowercase, remove common words
        def normalize(v):
            v = v.lower()
            for word in ["raceway", "park", "trotting", "club", "inc"]:
                v = v.replace(word, "")
            return v.strip()

        v1 = normalize(venue1)
        v2 = normalize(venue2)

        return SequenceMatcher(None, v1, v2).ratio()

    def merge_meeting_data(
        self,
        existing: Meeting,
        new_data: Dict,
        source: str,
    ) -> Meeting:
        """Merge new data into existing meeting.

        Args:
            existing: Existing meeting record
            new_data: New data to merge
            source: Data source ('tab_api' or 'hrnz_scraper')

        Returns:
            Updated meeting
        """
        # Track that both sources provided this meeting
        if existing.data_source != "merged":
            existing.data_source = "merged"

        # Store source-specific IDs
        if source == "tab_api" and new_data.get("id"):
            existing.tab_meeting_id = new_data["id"]
        elif source == "hrnz_scraper" and new_data.get("source_url"):
            existing.hrnz_url = new_data["source_url"]

        # Merge raw_json (keep both sources)
        if existing.raw_json is None:
            existing.raw_json = {}

        if "sources" not in existing.raw_json:
            existing.raw_json["sources"] = {}

        existing.raw_json["sources"][source] = new_data.get("raw_json", {})

        logger.info(
            f"Merged meeting {existing.id}: {source} data added to "
            f"existing {existing.data_source} data"
        )

        return existing

    def should_skip_ingestion(
        self,
        source: str,
        meeting_date: date,
        url: Optional[str] = None,
    ) -> bool:
        """Check if date/URL already ingested successfully.

        Args:
            source: Data source
            meeting_date: Date to check
            url: Optional URL (for HRNZ)

        Returns:
            True if should skip (already ingested), False otherwise
        """
        from packages.core.storage.models import DataIngestionLog

        query = self.session.query(DataIngestionLog).filter(
            DataIngestionLog.source == source,
            DataIngestionLog.date_processed == meeting_date,
            DataIngestionLog.status == "success",
        )

        if url:
            query = query.filter(DataIngestionLog.url == url)

        return query.first() is not None

    def log_ingestion(
        self,
        source: str,
        date_processed: date,
        url: Optional[str] = None,
        status: str = "success",
        records_created: int = 0,
        records_updated: int = 0,
        errors: Optional[str] = None,
    ):
        """Log ingestion attempt for tracking.

        Args:
            source: Data source
            date_processed: Date processed
            url: Optional URL
            status: Status ('success', 'failed', 'partial')
            records_created: Number of new records
            records_updated: Number of updated records
            errors: Error messages if any
        """
        from packages.core.storage.models import DataIngestionLog

        log = DataIngestionLog(
            source=source,
            date_processed=date_processed,
            url=url,
            status=status,
            records_created=records_created,
            records_updated=records_updated,
            errors=errors,
        )

        self.session.add(log)
        self.session.commit()

        logger.info(
            f"Logged {source} ingestion for {date_processed}: "
            f"{status} ({records_created} created, {records_updated} updated)"
        )
```

---

### Phase 3: Enhanced Ingestion Services (Week 2)

#### 3.1 Update TAB Ingestion Service

**File**: `packages/core/storage/ingestion.py`

Add reconciliation to existing `IngestionService`:

```python
from packages.core.storage.reconciliation import ReconciliationService

class IngestionService:
    def __init__(self, session: Session):
        self.session = session
        self.stats = {"meetings": 0, "races": 0, "starters": 0, "errors": 0}
        self.reconciliation = ReconciliationService(session)  # NEW

    async def ingest_meeting(self, meeting_data: Dict, category: str):
        """Ingest single meeting with reconciliation."""

        # Check if already ingested
        meeting_date = parse_date(meeting_data.get("date"))
        if self.reconciliation.should_skip_ingestion(
            "tab_api", meeting_date, url=None
        ):
            logger.info(f"Skipping already ingested date: {meeting_date}")
            return

        # Check for existing meeting
        existing = self.reconciliation.find_matching_meeting(
            venue=meeting_data.get("name"),
            meeting_date=meeting_date,
            source_id=meeting_data.get("meeting"),
        )

        if existing:
            # Merge with existing
            meeting = self.reconciliation.merge_meeting_data(
                existing, meeting_data, source="tab_api"
            )
            self.stats["meetings"] += 0  # Updated, not created
        else:
            # Create new
            meeting_dict = {
                "id": meeting_data.get("meeting"),
                "meeting_date": meeting_date,
                "venue": meeting_data.get("name"),
                "data_source": "tab_api",
                "tab_meeting_id": meeting_data.get("meeting"),
                "raw_json": meeting_data,
            }
            meeting = MeetingRepository.upsert(self.session, meeting_dict)
            self.stats["meetings"] += 1

        # ... rest of ingestion logic ...

        # Log successful ingestion
        self.reconciliation.log_ingestion(
            source="tab_api",
            date_processed=meeting_date,
            status="success",
            records_created=1,
        )
```

#### 3.2 Update HRNZ Scraper Integration

**File**: `apps/backend/worker/cli.py` (scrape_hrnz command)

Add reconciliation:

```python
# Inside scrape_and_import() function
from packages.core.storage.reconciliation import ReconciliationService

# ... existing code ...

# Import to database
with get_session() as session:
    reconciliation = ReconciliationService(session)

    # Check if already scraped
    meeting_date = date_type.fromisoformat(meeting["date"])
    if reconciliation.should_skip_ingestion(
        "hrnz_scraper", meeting_date, url=url
    ):
        console.print(f"  [dim]Skipped (already scraped)[/dim]")
        continue

    # Check for existing meeting
    existing = reconciliation.find_matching_meeting(
        venue=meeting["venue"],
        meeting_date=meeting_date,
        source_id=meeting["id"],
    )

    if existing:
        # Merge with existing TAB data
        meeting_obj = reconciliation.merge_meeting_data(
            existing, scraped, source="hrnz_scraper"
        )
        is_new = False
    else:
        # Create new meeting
        meeting["data_source"] = "hrnz_scraper"
        meeting["hrnz_url"] = url
        meeting_obj = MeetingRepository.upsert(session, meeting)
        is_new = True
        stats["meetings"] += 1

    # ... rest of logic ...

    # Log ingestion
    reconciliation.log_ingestion(
        source="hrnz_scraper",
        date_processed=meeting_date,
        url=url,
        status="success",
        records_created=1 if is_new else 0,
        records_updated=0 if is_new else 1,
    )
```

---

### Phase 4: HRNZ Index Auto-Discovery (Week 3)

Implement automatic discovery of HRNZ meeting URLs from their results index.

**File**: `packages/hrnz_scraper/index_scraper.py`

```python
"""Scraper for HRNZ results index to discover meeting URLs."""

import asyncio
import re
from datetime import date, datetime
from typing import List, Dict, Any
from urllib.parse import urljoin

import httpx
from bs4 import BeautifulSoup

from packages.core.common.logging import get_logger

logger = get_logger(__name__)


class HRNZIndexScraper:
    """Scrapes HRNZ results index to discover meeting URLs."""

    BASE_URL = "https://infohorse.hrnz.co.nz/datahrs/results/"
    RATE_LIMIT_DELAY = 2.0

    def __init__(self, timeout: float = 30.0):
        self.timeout = timeout
        self._client = None
        self._last_request_time = 0.0

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            timeout=self.timeout,
            follow_redirects=True,
            headers={"User-Agent": "TipSharks/1.0 (Educational Bot)"},
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._client:
            await self._client.aclose()

    async def _rate_limited_get(self, url: str) -> str:
        """Make rate-limited GET request."""
        import time

        elapsed = time.time() - self._last_request_time
        if elapsed < self.RATE_LIMIT_DELAY:
            await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed)

        response = await self._client.get(url)
        response.raise_for_status()
        self._last_request_time = time.time()
        return response.text

    async def discover_month_urls(
        self, year: int, month: int
    ) -> List[Dict[str, Any]]:
        """Discover all meeting URLs for a specific month.

        Args:
            year: Year (e.g., 2024)
            month: Month (1-12)

        Returns:
            List of meeting dicts with 'url', 'venue', 'date'
        """
        # The index URL pattern may vary - this needs to be determined
        # For now, return results.htm and parse from there
        index_url = urljoin(self.BASE_URL, "results.htm")

        html = await self._rate_limited_get(index_url)
        soup = BeautifulSoup(html, "html.parser")

        meetings = []

        # Find all links matching pattern: DDMMYYCCrs.htm
        pattern = re.compile(r"(\d{6}\d{2}rs\.htm)")
        links = soup.find_all("a", href=pattern)

        for link in links:
            url = link.get("href")
            venue = link.get_text(strip=True)

            # Extract date from URL: DDMMYYCC
            match = re.search(r"(\d{2})(\d{2})(\d{2})", url)
            if match:
                day, mon, yr = match.groups()
                # Convert YY to YYYY (assume 2000-2099)
                year_full = 2000 + int(yr)

                try:
                    meeting_date = date(year_full, int(mon), int(day))

                    # Filter by requested month
                    if meeting_date.year == year and meeting_date.month == month:
                        meetings.append({
                            "url": url,
                            "venue": venue,
                            "date": meeting_date.isoformat(),
                        })
                except ValueError:
                    logger.warning(f"Invalid date in URL: {url}")
                    continue

        logger.info(f"Discovered {len(meetings)} meetings for {year}-{month:02d}")
        return meetings

    async def discover_date_range(
        self, start_date: date, end_date: date
    ) -> List[str]:
        """Discover all meeting URLs within a date range.

        Args:
            start_date: Start date
            end_date: End date

        Returns:
            List of meeting URLs
        """
        urls = []

        # Iterate through months
        current = start_date.replace(day=1)
        while current <= end_date:
            month_meetings = await self.discover_month_urls(
                current.year, current.month
            )

            for meeting in month_meetings:
                meeting_date = date.fromisoformat(meeting["date"])
                if start_date <= meeting_date <= end_date:
                    urls.append(meeting["url"])

            # Next month
            if current.month == 12:
                current = current.replace(year=current.year + 1, month=1)
            else:
                current = current.replace(month=current.month + 1)

        logger.info(
            f"Discovered {len(urls)} meeting URLs from {start_date} to {end_date}"
        )
        return urls
```

**Update CLI** to support auto-discovery:

```bash
# New command: scrape-hrnz with auto-discovery
docker compose run --rm worker python -m apps.backend.worker.cli scrape-hrnz \
  --from 2024-01-01 \
  --to 2024-12-31 \
  --auto-discover
```

---

### Phase 5: Continuous TAB Capture (Week 3)

Deploy continuous ingestion as Docker service.

**File**: `docker-compose.yml` (add service)

```yaml
services:
  # ... existing services ...

  tab-ingestion-worker:
    build:
      context: .
      dockerfile: docker/Dockerfile.worker
    command: python infrastructure/scripts/continuous_ingest.py
    restart: unless-stopped
    environment:
      - TAB_MOCK_MODE=false
      - LOG_LEVEL=INFO
    depends_on:
      - db
    networks:
      - default
```

**File**: `infrastructure/scripts/continuous_ingest.py`

```python
"""Continuous TAB API ingestion script."""

import asyncio
import time
from datetime import date, timedelta

from packages.core.common.logging import get_logger, setup_logging
from packages.core.storage.database import get_session
from packages.core.storage.ingestion import IngestionService

setup_logging()
logger = get_logger(__name__)

INTERVAL_MINUTES = 30


async def ingest_today():
    """Ingest today's races."""
    today = date.today()

    logger.info(f"Ingesting races for {today}")

    with get_session() as session:
        service = IngestionService(session)

        try:
            meetings, races, starters = await service.ingest_date_range(
                today, today
            )
            logger.info(
                f"Ingested: {meetings} meetings, {races} races, "
                f"{starters} starters"
            )
        except Exception as e:
            logger.error(f"Ingestion failed: {e}", exc_info=True)


def main():
    """Run continuous ingestion loop."""
    logger.info(f"Starting continuous TAB ingestion (every {INTERVAL_MINUTES} min)")

    while True:
        try:
            asyncio.run(ingest_today())
        except Exception as e:
            logger.error(f"Ingestion cycle failed: {e}", exc_info=True)

        # Sleep until next cycle
        logger.info(f"Sleeping for {INTERVAL_MINUTES} minutes...")
        time.sleep(INTERVAL_MINUTES * 60)


if __name__ == "__main__":
    main()
```

---

## Deployment Plan

### Week 1: Foundation
- ✅ Create database migration for source tracking
- ✅ Implement `ReconciliationService`
- ✅ Update `MeetingRepository` to use new fields
- ✅ Add ingestion logging
- 🧪 Test reconciliation logic

### Week 2: Integration
- ✅ Update TAB `IngestionService` with reconciliation
- ✅ Update HRNZ scraper command with reconciliation
- ✅ Add incremental update checks
- 🧪 Test hybrid ingestion (TAB + HRNZ)

### Week 3: Automation
- ✅ Implement HRNZ index auto-discovery
- ✅ Deploy continuous TAB ingestion service
- ✅ Create monitoring and alerting
- 🧪 End-to-end testing

### Week 4: Historical Backfill
- 📊 Run HRNZ auto-discovery for 2024
- 🕷️ Scrape discovered meetings
- 🔄 Compute ratings for all data
- ✅ Validate data quality

---

## Monitoring & Alerting

### Key Metrics to Track

1. **Ingestion Statistics**:
   - Meetings per day (TAB vs HRNZ)
   - Duplicates detected and merged
   - Failed ingestions

2. **Data Quality**:
   - Meetings without races
   - Races without starters
   - Missing ratings

3. **System Health**:
   - TAB API response times
   - HRNZ scraper error rate
   - Database size growth

### Dashboard Queries

```sql
-- Daily ingestion summary
SELECT
    source,
    date_processed,
    COUNT(*) as attempts,
    SUM(records_created) as created,
    SUM(records_updated) as updated,
    COUNT(*) FILTER (WHERE status = 'failed') as failures
FROM data_ingestion_log
WHERE date_processed >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY source, date_processed
ORDER BY date_processed DESC;

-- Data source distribution
SELECT
    data_source,
    COUNT(*) as meetings,
    MIN(meeting_date) as earliest,
    MAX(meeting_date) as latest
FROM meetings
GROUP BY data_source;

-- Merged meetings (from both sources)
SELECT
    id,
    venue,
    meeting_date,
    tab_meeting_id,
    hrnz_url
FROM meetings
WHERE data_source = 'merged'
ORDER BY meeting_date DESC
LIMIT 100;
```

---

## Success Criteria

### Phase 1 (Foundation)
- ✅ Migration applied successfully
- ✅ No duplicate meetings created during testing
- ✅ Source tracking working correctly

### Phase 2 (Integration)
- ✅ TAB + HRNZ ingestion both use reconciliation
- ✅ Incremental updates skip already-processed dates
- ✅ Merged meetings track both sources

### Phase 3 (Automation)
- ✅ Auto-discovery finds all meetings for test month
- ✅ Continuous ingestion runs for 24 hours without errors
- ✅ New races captured within 30 minutes of completion

### Phase 4 (Production)
- ✅ 1+ years of historical data imported
- ✅ Daily live capture operational
- ✅ <1% duplicate rate
- ✅ Data quality metrics met

---

## Risk Mitigation

### Risk: HRNZ HTML Structure Changes
**Mitigation**:
- Comprehensive error handling
- Alerting on parse failures
- Fallback to manual URL collection

### Risk: TAB API Rate Limiting
**Mitigation**:
- Built-in rate limiting (2 sec delay)
- Exponential backoff on errors
- Monitoring API response codes

### Risk: Duplicate Records Despite Reconciliation
**Mitigation**:
- Fuzzy matching with high threshold (85%)
- Manual review of merged meetings
- Periodic deduplication script

### Risk: Database Growth
**Mitigation**:
- Monitor disk usage
- Implement data retention policy
- Archive old meetings if needed

---

## Conclusion

This implementation plan provides a complete solution for:
- ✅ Historical data via HRNZ scraping
- ✅ Live data via TAB API
- ✅ Deduplication and reconciliation
- ✅ Incremental updates
- ✅ Automated discovery
- ✅ Production monitoring

**Timeline**: 4 weeks from start to production-ready
**Effort**: ~80-100 hours development + testing
**Result**: Comprehensive NZ harness racing database with ongoing updates

---

**Next Step**: Begin Phase 1 implementation (database migration).
