Coverage for packages / core / storage / repositories.py: 24%
378 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
1"""Repository layer for database operations with idempotent upserts.
3Supports TAB Affiliates API data structures.
4"""
6import copy
7import hashlib
8from datetime import UTC, date, datetime
9from typing import Any
11from sqlalchemy import and_, desc, func, or_
12from sqlalchemy.dialects.postgresql import insert
13from sqlalchemy.orm import Session, joinedload
15from packages.core.common.cache import cache_get, cache_set
16from packages.core.common.logging import get_logger
17from packages.core.storage.audit import AuditLogger
18from packages.core.storage.models import (
19 BarrierAdjustment,
20 Driver,
21 EntityType,
22 HandicapAdjustment,
23 Horse,
24 Meeting,
25 Race,
26 RatingSnapshot,
27 Starter,
28 Trainer,
29)
31logger = get_logger(__name__)
33_HRNZ_ID_MODULUS = 2147483647
36def normalize_entity_id(value: Any, fallback_name: str | None = None) -> int | None:
37 """Normalize entity identifiers across TAB/HRNZ sources."""
38 if value is None and not fallback_name:
39 return None
41 if isinstance(value, int):
42 return value
44 if isinstance(value, str):
45 stripped = value.strip()
46 if stripped.isdigit():
47 return int(stripped)
48 if stripped:
49 return (
50 int(hashlib.md5(stripped.encode()).hexdigest()[:8], 16)
51 % _HRNZ_ID_MODULUS
52 )
54 if fallback_name:
55 return (
56 int(hashlib.md5(fallback_name.encode()).hexdigest()[:8], 16)
57 % _HRNZ_ID_MODULUS
58 )
60 return None
63def normalize_runner_data(runner_data: dict[str, Any]) -> dict[str, Any]:
64 """Normalize runner payloads from TAB/HRNZ into a consistent shape."""
65 data = dict(runner_data or {})
67 horse_name = (
68 data.get("name") or data.get("horse_name") or data.get("horse") or "Unknown"
69 )
70 data["name"] = horse_name
71 data["horse_name"] = horse_name
73 horse_id = data.get("horse_id") or data.get("horseId") or data.get("horse_uuid")
74 data["horse_id"] = normalize_entity_id(horse_id, fallback_name=horse_name)
76 driver_name = data.get("jockey") or data.get("driver_name") or data.get("driver")
77 if driver_name:
78 data["driver_name"] = driver_name
79 data["jockey"] = driver_name
81 trainer_name = data.get("trainer_name") or data.get("trainer")
82 if trainer_name:
83 data["trainer_name"] = trainer_name
85 driver_id = data.get("driver_id") or data.get("driverId") or data.get("driver_uuid")
86 data["driver_id"] = normalize_entity_id(driver_id, fallback_name=driver_name)
88 trainer_id = (
89 data.get("trainer_id") or data.get("trainerId") or data.get("trainer_uuid")
90 )
91 data["trainer_id"] = normalize_entity_id(trainer_id, fallback_name=trainer_name)
93 if "runner_number" not in data:
94 data["runner_number"] = data.get("number") or data.get("runnerNo")
96 if "barrier" not in data:
97 data["barrier"] = data.get("barrier_draw")
99 if "handicap_m" not in data:
100 data["handicap_m"] = data.get("handicap")
102 if "handicap" not in data:
103 data["handicap"] = data.get("handicap_m")
105 placing_value = data.get("placing")
106 if isinstance(placing_value, str) and placing_value.strip().isdigit():
107 data["placing"] = int(placing_value.strip())
109 for key in (
110 "barrier_position",
111 "margin",
112 "race_time",
113 "placing",
114 "barrier",
115 "handicap_m",
116 "runner_number",
117 "driver_name",
118 "trainer_name",
119 ):
120 data.setdefault(key, None)
122 return data
125def normalize_race_data(race_data: dict[str, Any]) -> dict[str, Any]:
126 """Normalize race payloads to ensure consistent nested structure."""
127 data = copy.deepcopy(race_data or {})
129 if "distance_m" not in data and "distance" in data:
130 data["distance_m"] = data.get("distance")
132 nested = data.get("raw_json")
133 if isinstance(nested, dict):
134 if "distance_m" not in nested and data.get("distance_m") is not None:
135 nested["distance_m"] = data.get("distance_m")
136 if "race_number" not in nested and data.get("race_number") is not None:
137 nested["race_number"] = data.get("race_number")
139 starters = nested.get("starters")
140 if isinstance(starters, list):
141 nested["starters"] = [
142 normalize_runner_data(starter) if isinstance(starter, dict) else starter
143 for starter in starters
144 ]
146 data["raw_json"] = nested
148 return data
151def generate_driver_id(name: str) -> int:
152 """Generate deterministic integer ID from driver name.
154 Uses SHA256 hash truncated to 31 bits (positive integer).
155 Collision risk: ~0.0001% at 100,000 drivers.
157 Args:
158 name: Driver name
160 Returns:
161 Positive integer ID
162 """
163 if not name:
164 raise ValueError("Cannot generate ID from empty name")
165 normalized = name.strip().upper()
166 hash_bytes = hashlib.sha256(normalized.encode("utf-8")).digest()
167 return int.from_bytes(hash_bytes[:4], "big") & 0x7FFFFFFF
170def generate_trainer_id(name: str) -> int:
171 """Generate deterministic integer ID from trainer name.
173 Uses SHA256 hash truncated to 31 bits (positive integer).
175 Args:
176 name: Trainer name
178 Returns:
179 Positive integer ID
180 """
181 if not name:
182 raise ValueError("Cannot generate ID from empty name")
183 normalized = name.strip().upper()
184 hash_bytes = hashlib.sha256(normalized.encode("utf-8")).digest()
185 return int.from_bytes(hash_bytes[:4], "big") & 0x7FFFFFFF
188def parse_tab_datetime(dt_string: str | None) -> datetime | None:
189 """Parse TAB API datetime string.
191 Args:
192 dt_string: ISO datetime string (e.g., "2024-01-15T14:30:00+13:00")
194 Returns:
195 datetime object or None if parsing fails
196 """
197 if not dt_string:
198 return None
199 try:
200 # Try ISO format with timezone
201 return datetime.fromisoformat(dt_string.replace("Z", "+00:00"))
202 except ValueError:
203 try:
204 # Try without timezone
205 return datetime.fromisoformat(dt_string[:19])
206 except ValueError:
207 logger.warning(f"Failed to parse datetime: {dt_string}")
208 return None
211def parse_tab_date(date_string: str | None) -> date | None:
212 """Parse TAB API date from datetime string or date string.
214 Args:
215 date_string: Date string (e.g., "2024-01-15" or "20240115" or full datetime)
217 Returns:
218 date object or None if parsing fails
219 """
220 if not date_string:
221 return None
222 try:
223 # Try ISO date format YYYY-MM-DD
224 if len(date_string) >= 10 and "-" in date_string:
225 return date.fromisoformat(date_string[:10])
226 # Try YYYYMMDD format
227 elif len(date_string) >= 8 and date_string[:8].isdigit():
228 return datetime.strptime(date_string[:8], "%Y%m%d").date()
229 else:
230 return None
231 except ValueError:
232 logger.warning(f"Failed to parse date: {date_string}")
233 return None
236class MeetingRepository:
237 """Repository for meeting operations."""
239 @staticmethod
240 def upsert(session: Session, meeting_data: dict[str, Any]) -> Meeting:
241 """Upsert meeting from TAB API data.
243 Args:
244 session: Database session
245 meeting_data: Raw meeting data from TAB API
247 Returns:
248 Meeting instance
249 """
250 # TAB API uses "meeting" as the ID field (string)
251 meeting_id = meeting_data.get("meeting")
252 if not meeting_id:
253 raise ValueError("Meeting data missing 'meeting' ID")
255 # Parse meeting date from various formats
256 meeting_date_raw = meeting_data.get("date") or meeting_data.get("meeting_date")
257 meeting_date = parse_tab_date(meeting_date_raw)
258 if not meeting_date:
259 raise ValueError(f"Could not parse meeting date: {meeting_date_raw}")
261 # Use meeting name as venue
262 venue = meeting_data.get("name", "Unknown")
264 # Category: T (Thoroughbred), H (Harness), G (Greyhound)
265 category = meeting_data.get("category", "H")
267 # Upsert using PostgreSQL INSERT ... ON CONFLICT
268 stmt = insert(Meeting).values(
269 id=meeting_id,
270 meeting_date=meeting_date,
271 venue=venue,
272 category=category,
273 raw_json=meeting_data,
274 )
275 stmt = stmt.on_conflict_do_update(
276 index_elements=["id"],
277 set_={
278 "meeting_date": stmt.excluded.meeting_date,
279 "venue": stmt.excluded.venue,
280 "category": stmt.excluded.category,
281 "raw_json": stmt.excluded.raw_json,
282 "updated_at": stmt.excluded.updated_at,
283 },
284 )
286 session.execute(stmt)
287 session.flush()
289 meeting = session.query(Meeting).filter(Meeting.id == meeting_id).one()
290 logger.debug(
291 f"Upserted meeting {meeting_id} ({venue}, {meeting_date}, {category})"
292 )
293 return meeting
295 @staticmethod
296 def get_by_id(session: Session, meeting_id: str) -> Meeting | None:
297 """Get meeting by ID.
299 Args:
300 session: Database session
301 meeting_id: Meeting ID (string)
303 Returns:
304 Meeting or None
305 """
306 return session.query(Meeting).filter(Meeting.id == meeting_id).first()
308 @staticmethod
309 def get_by_date_range(
310 session: Session, date_from: date, date_to: date
311 ) -> list[Meeting]:
312 """Get meetings in date range.
314 Args:
315 session: Database session
316 date_from: Start date (inclusive)
317 date_to: End date (inclusive)
319 Returns:
320 List of meetings
321 """
322 return (
323 session.query(Meeting)
324 .filter(Meeting.meeting_date >= date_from, Meeting.meeting_date <= date_to)
325 .order_by(Meeting.meeting_date)
326 .all()
327 )
330class RaceRepository:
331 """Repository for race operations."""
333 @staticmethod
334 def upsert(session: Session, meeting_id: str, race_data: dict[str, Any]) -> Race:
335 """Upsert race from TAB API data.
337 Args:
338 session: Database session
339 meeting_id: Parent meeting ID (string)
340 race_data: Raw race data from TAB API (from EventRaceDetails.race)
342 Returns:
343 Race instance
344 """
345 race_data = normalize_race_data(race_data)
347 race_number = race_data.get("race_number")
348 if not race_number:
349 raise ValueError("Race data missing race_number")
351 # TAB event ID (string)
352 tab_event_id = race_data.get("event_id")
354 # TAB API returns distance as integer (metres)
355 distance_m = race_data.get("distance")
357 # Start type (Mobile, Standing for harness)
358 start_type = race_data.get("start_type")
360 # Gait (Pace, Trot for harness)
361 gait = race_data.get("gait")
363 weather = race_data.get("weather")
364 track_condition = race_data.get("track_condition")
366 # Parse race datetime from available fields
367 race_datetime = RaceRepository._parse_race_datetime(race_data)
369 # Find existing race by composite key
370 existing = (
371 session.query(Race)
372 .filter(Race.meeting_id == meeting_id, Race.race_number == race_number)
373 .first()
374 )
376 if existing:
377 # Update existing
378 existing.tab_event_id = tab_event_id
379 existing.distance_m = distance_m
380 existing.start_type = start_type
381 existing.gait = gait
382 existing.weather = weather
383 existing.track_condition = track_condition
384 existing.race_datetime = race_datetime
385 existing.raw_json = race_data
386 session.flush()
387 logger.debug(f"Updated race {meeting_id}/{race_number}")
388 return existing
389 else:
390 # Insert new
391 race = Race(
392 meeting_id=meeting_id,
393 tab_event_id=tab_event_id,
394 race_number=race_number,
395 distance_m=distance_m,
396 start_type=start_type,
397 gait=gait,
398 weather=weather,
399 track_condition=track_condition,
400 race_datetime=race_datetime,
401 raw_json=race_data,
402 )
403 session.add(race)
404 session.flush()
405 logger.debug(
406 f"Inserted race {meeting_id}/{race_number} (event_id={tab_event_id})"
407 )
408 return race
410 @staticmethod
411 def _parse_race_datetime(race_data: dict[str, Any]) -> datetime | None:
412 """Parse race datetime from known TAB/HRNZ fields."""
413 candidates = [
414 race_data.get("advertised_start_string"),
415 race_data.get("race_datetime"),
416 race_data.get("start_time"),
417 race_data.get("start_datetime"),
418 race_data.get("advertised_start_time"),
419 ]
420 raw_json = race_data.get("raw_json")
421 if isinstance(raw_json, dict):
422 candidates.append(raw_json.get("race_datetime"))
424 for value in candidates:
425 if isinstance(value, datetime):
426 return value
427 if isinstance(value, str) and value.strip():
428 parsed = parse_tab_datetime(value)
429 if parsed:
430 return parsed
432 advertised_start = race_data.get("advertised_start")
433 if isinstance(advertised_start, dict):
434 for key in ("seconds", "secs", "time"):
435 if key in advertised_start:
436 try:
437 timestamp = int(advertised_start[key])
438 except (TypeError, ValueError):
439 continue
440 return datetime.fromtimestamp(timestamp, tz=UTC)
442 return None
444 @staticmethod
445 def get_by_meeting(session: Session, meeting_id: str) -> list[Race]:
446 """Get all races for a meeting.
448 Args:
449 session: Database session
450 meeting_id: Meeting ID (string)
452 Returns:
453 List of races
454 """
455 return (
456 session.query(Race)
457 .filter(Race.meeting_id == meeting_id)
458 .order_by(Race.race_number)
459 .all()
460 )
462 @staticmethod
463 def get_races_for_recompute(
464 session: Session, date_from: date, date_to: date
465 ) -> list[Race]:
466 """Get races in date range, ordered chronologically.
468 Args:
469 session: Database session
470 date_from: Start date
471 date_to: End date
473 Returns:
474 List of races ordered by datetime/meeting date
475 """
476 return (
477 session.query(Race)
478 .join(Meeting)
479 .filter(
480 Meeting.meeting_date >= date_from,
481 Meeting.meeting_date <= date_to,
482 )
483 .order_by(Meeting.meeting_date, Race.race_number)
484 .all()
485 )
488class HorseRepository:
489 """Repository for horse operations."""
491 @staticmethod
492 def upsert(
493 session: Session,
494 horse_id: int,
495 name: str,
496 raw_data: dict[str, Any] | None = None,
497 ) -> Horse:
498 """Upsert horse from TAB runner data.
500 Args:
501 session: Database session
502 horse_id: TAB horse_id (integer)
503 name: Horse name
504 raw_data: Optional raw runner data for metadata
506 Returns:
507 Horse instance
508 """
509 if not horse_id:
510 raise ValueError("Horse data missing ID")
512 stmt = insert(Horse).values(
513 id=horse_id,
514 name=name,
515 raw_json=raw_data,
516 )
517 stmt = stmt.on_conflict_do_update(
518 index_elements=["id"],
519 set_={
520 "name": stmt.excluded.name,
521 "raw_json": stmt.excluded.raw_json,
522 "updated_at": stmt.excluded.updated_at,
523 },
524 )
526 session.execute(stmt)
527 session.flush()
529 return session.query(Horse).filter(Horse.id == horse_id).one()
532class DriverRepository:
533 """Repository for driver operations.
535 TAB API only provides driver names, not IDs.
536 IDs are generated from name hash when no ID is provided.
537 """
539 @staticmethod
540 def upsert(session: Session, name: str, driver_id: int | None = None) -> Driver:
541 """Upsert driver by name.
543 Driver ID is generated from name hash when not provided since
544 TAB API only provides names (in the "jockey" field for harness).
546 Args:
547 session: Database session
548 name: Driver name
549 driver_id: Optional explicit ID from upstream sources
551 Returns:
552 Driver instance
553 """
554 if not name or not name.strip():
555 raise ValueError("Driver name is required")
557 if driver_id is None:
558 driver_id = generate_driver_id(name)
560 stmt = insert(Driver).values(
561 id=driver_id,
562 name=name.strip(),
563 raw_json={"name": name},
564 )
565 stmt = stmt.on_conflict_do_update(
566 index_elements=["id"],
567 set_={
568 "name": stmt.excluded.name,
569 "raw_json": stmt.excluded.raw_json,
570 "updated_at": stmt.excluded.updated_at,
571 },
572 )
574 session.execute(stmt)
575 session.flush()
577 return session.query(Driver).filter(Driver.id == driver_id).one()
580class TrainerRepository:
581 """Repository for trainer operations.
583 TAB API only provides trainer names, not IDs.
584 IDs are generated from name hash when no ID is provided.
585 """
587 @staticmethod
588 def upsert(session: Session, name: str, trainer_id: int | None = None) -> Trainer:
589 """Upsert trainer by name.
591 Trainer ID is generated from name hash when not provided since
592 TAB API only provides names (in the "trainer_name" field).
594 Args:
595 session: Database session
596 name: Trainer name
597 trainer_id: Optional explicit ID from upstream sources
599 Returns:
600 Trainer instance
601 """
602 if not name or not name.strip():
603 raise ValueError("Trainer name is required")
605 if trainer_id is None:
606 trainer_id = generate_trainer_id(name)
608 stmt = insert(Trainer).values(
609 id=trainer_id,
610 name=name.strip(),
611 raw_json={"name": name},
612 )
613 stmt = stmt.on_conflict_do_update(
614 index_elements=["id"],
615 set_={
616 "name": stmt.excluded.name,
617 "raw_json": stmt.excluded.raw_json,
618 "updated_at": stmt.excluded.updated_at,
619 },
620 )
622 session.execute(stmt)
623 session.flush()
625 return session.query(Trainer).filter(Trainer.id == trainer_id).one()
628class StarterRepository:
629 """Repository for starter (runner) operations."""
631 @staticmethod
632 def upsert(
633 session: Session,
634 race_id: int,
635 runner_data: dict[str, Any],
636 placing: int | None = None,
637 ) -> Starter | None:
638 """Upsert starter/runner from TAB API data.
640 Args:
641 session: Database session
642 race_id: Parent race ID
643 runner_data: Runner data from TAB API (EventRunner)
644 placing: Optional placing from results (matched by entrant_id)
646 Returns:
647 Starter instance or None if scratched
648 """
649 runner_data = normalize_runner_data(runner_data)
651 # Skip scratched runners
652 is_scratched = runner_data.get("is_scratched", False)
653 if is_scratched:
654 logger.debug(f"Skipping scratched runner: {runner_data.get('name')}")
655 return None
657 # Extract horse data - TAB provides horse_id as integer
658 horse_id = runner_data.get("horse_id")
659 horse_name = runner_data.get("name", "Unknown")
661 if horse_id:
662 HorseRepository.upsert(session, horse_id, horse_name, runner_data)
664 # Extract driver from "jockey" field (TAB uses jockey for all racing types)
665 # For harness, this is actually the driver
666 jockey_name = runner_data.get("jockey") or runner_data.get("driver_name")
667 driver_id = None
668 if jockey_name and jockey_name.strip():
669 driver = DriverRepository.upsert(
670 session, jockey_name, driver_id=runner_data.get("driver_id")
671 )
672 driver_id = driver.id
673 elif runner_data.get("driver_id") is not None:
674 driver_id = runner_data.get("driver_id")
676 # Extract trainer from "trainer_name" field
677 trainer_name = runner_data.get("trainer_name")
678 trainer_id = None
679 if trainer_name and trainer_name.strip():
680 trainer = TrainerRepository.upsert(
681 session, trainer_name, trainer_id=runner_data.get("trainer_id")
682 )
683 trainer_id = trainer.id
684 elif runner_data.get("trainer_id") is not None:
685 trainer_id = runner_data.get("trainer_id")
687 # TAB provides barrier as integer directly
688 barrier = runner_data.get("barrier")
690 # Harness-specific barrier position (e.g., "1F" for front row, "2B" for back)
691 barrier_position = runner_data.get("barrier_position")
693 # Runner/saddlecloth number
694 runner_number = runner_data.get("runner_number")
696 # Handicap in meters
697 handicap_m = runner_data.get("handicap")
698 if handicap_m is None:
699 handicap_m = runner_data.get("handicap_m")
700 if handicap_m:
701 handicap_m = int(handicap_m)
703 # Placing is provided separately from results array
704 did_not_finish = False
705 if placing is None:
706 # No result yet (upcoming race) or check if it's a DNF
707 # TAB results have position as integer, non-finishers typically not in results
708 placing = runner_data.get("placing")
709 if isinstance(placing, str) and placing.strip().isdigit():
710 placing = int(placing.strip())
712 # Check for existing starter (by race + horse)
713 existing = None
714 if horse_id:
715 existing = (
716 session.query(Starter)
717 .filter(Starter.race_id == race_id, Starter.horse_id == horse_id)
718 .first()
719 )
720 if existing is None:
721 runner_number = runner_data.get("runner_number")
722 if runner_number is not None:
723 existing = (
724 session.query(Starter)
725 .filter(
726 Starter.race_id == race_id,
727 Starter.runner_number == runner_number,
728 )
729 .first()
730 )
732 if existing:
733 # Track placing changes for audit logging
734 old_placing = existing.placing
735 placing_changed = (
736 placing is not None
737 and old_placing is not None
738 and placing != old_placing
739 )
741 # Update existing
742 existing.driver_id = driver_id
743 existing.trainer_id = trainer_id
744 existing.runner_number = runner_number
745 existing.barrier = barrier
746 existing.barrier_position = barrier_position
747 existing.handicap_m = handicap_m
748 existing.placing = placing
749 existing.did_not_finish = did_not_finish
750 existing.raw_json = runner_data
751 session.flush()
753 # Audit log placing correction if it changed
754 if placing_changed:
755 AuditLogger.log_change(
756 session=session,
757 table_name="starters",
758 record_id=str(existing.id),
759 action="CORRECT",
760 old_values={"placing": old_placing},
761 new_values={"placing": placing},
762 changed_by="system",
763 change_reason=(
764 f"Placing corrected from {old_placing} to {placing} "
765 f"for starter {existing.id} in race {race_id}"
766 ),
767 )
769 return existing
770 else:
771 # Insert new
772 starter = Starter(
773 race_id=race_id,
774 horse_id=horse_id,
775 driver_id=driver_id,
776 trainer_id=trainer_id,
777 runner_number=runner_number,
778 barrier=barrier,
779 barrier_position=barrier_position,
780 handicap_m=handicap_m,
781 placing=placing,
782 did_not_finish=did_not_finish,
783 raw_json=runner_data,
784 )
785 session.add(starter)
786 session.flush()
787 return starter
789 @staticmethod
790 def get_by_race(session: Session, race_id: int) -> list[Starter]:
791 """Get all starters for a race.
793 Args:
794 session: Database session
795 race_id: Race ID
797 Returns:
798 List of starters
799 """
800 return session.query(Starter).filter(Starter.race_id == race_id).all()
803class RatingSnapshotRepository:
804 """Repository for rating snapshot operations."""
806 @staticmethod
807 def upsert(
808 session: Session,
809 entity_type: EntityType,
810 entity_id: int,
811 as_of_race_id: int,
812 rating: float,
813 rd: float | None = None,
814 meta: dict[str, Any] | None = None,
815 ) -> RatingSnapshot:
816 """Upsert rating snapshot.
818 Args:
819 session: Database session
820 entity_type: Type of entity (horse/driver/trainer)
821 entity_id: Entity ID
822 as_of_race_id: Race ID this rating is computed after
823 rating: Elo rating value
824 rd: Rating deviation (optional)
825 meta: Additional metadata (optional)
827 Returns:
828 RatingSnapshot instance
829 """
830 stmt = insert(RatingSnapshot).values(
831 entity_type=entity_type,
832 entity_id=entity_id,
833 as_of_race_id=as_of_race_id,
834 rating=rating,
835 rd=rd,
836 meta=meta,
837 )
838 stmt = stmt.on_conflict_do_update(
839 index_elements=["entity_type", "entity_id", "as_of_race_id"],
840 set_={
841 "rating": stmt.excluded.rating,
842 "rd": stmt.excluded.rd,
843 "meta": stmt.excluded.meta,
844 },
845 )
847 session.execute(stmt)
848 session.flush()
850 return (
851 session.query(RatingSnapshot)
852 .filter(
853 RatingSnapshot.entity_type == entity_type,
854 RatingSnapshot.entity_id == entity_id,
855 RatingSnapshot.as_of_race_id == as_of_race_id,
856 )
857 .one()
858 )
860 @staticmethod
861 def get_latest_rating(
862 session: Session,
863 entity_type: EntityType,
864 entity_id: int,
865 before_race_id: int | None = None,
866 ) -> RatingSnapshot | None:
867 """Get latest rating for an entity.
869 Args:
870 session: Database session
871 entity_type: Type of entity
872 entity_id: Entity ID
873 before_race_id: Only consider ratings before this race (exclusive)
875 Returns:
876 Latest RatingSnapshot or None
877 """
878 query = (
879 session.query(RatingSnapshot)
880 .join(Race, RatingSnapshot.as_of_race_id == Race.id)
881 .join(Meeting, Race.meeting_id == Meeting.id)
882 .filter(
883 RatingSnapshot.entity_type == entity_type,
884 RatingSnapshot.entity_id == entity_id,
885 )
886 )
888 if before_race_id is not None:
889 target_race = (
890 session.query(Race)
891 .options(joinedload(Race.meeting))
892 .filter(Race.id == before_race_id)
893 .first()
894 )
895 if target_race is None or target_race.meeting is None:
896 return None
898 if target_race.race_datetime:
899 query = query.filter(Race.race_datetime < target_race.race_datetime)
900 else:
901 query = query.filter(
902 or_(
903 Meeting.meeting_date < target_race.meeting.meeting_date,
904 and_(
905 Meeting.meeting_date == target_race.meeting.meeting_date,
906 Race.meeting_id == target_race.meeting_id,
907 Race.race_number < target_race.race_number,
908 ),
909 )
910 )
912 return query.order_by(
913 Meeting.meeting_date.desc(),
914 Race.race_datetime.desc().nulls_last(),
915 Race.race_number.desc(),
916 Race.id.desc(),
917 ).first()
919 @staticmethod
920 def get_top_ratings(
921 session: Session,
922 entity_type: EntityType,
923 limit: int = 100,
924 as_of_date: date | None = None,
925 ) -> list[RatingSnapshot]:
926 """Get top-rated entities.
928 Args:
929 session: Database session
930 entity_type: Type of entity
931 limit: Maximum results
932 as_of_date: Only include ratings up to this date
934 Returns:
935 List of latest rating snapshots, sorted by rating descending
936 """
937 # Use cache when querying live data (no as_of_date filter)
938 if as_of_date is None:
939 cache_key = f"top_ratings:{entity_type.value}:{limit}:live"
940 cached = cache_get(cache_key)
941 if cached is not None:
942 # Reconstruct RatingSnapshot objects from cached IDs
943 ids = cached.get("snapshot_ids", [])
944 if ids:
945 # Fetch full objects from DB by ID (preserves ORM state)
946 objects = (
947 session.query(RatingSnapshot)
948 .filter(RatingSnapshot.id.in_(ids))
949 .order_by(desc(RatingSnapshot.rating))
950 .all()
951 )
952 return objects
953 return []
955 sort_order = (
956 Meeting.meeting_date.desc(),
957 Race.race_datetime.desc().nulls_last(),
958 Race.race_number.desc(),
959 Race.id.desc(),
960 )
962 ranked = (
963 session.query(
964 RatingSnapshot.id.label("snapshot_id"),
965 RatingSnapshot.entity_id,
966 func.row_number()
967 .over(partition_by=RatingSnapshot.entity_id, order_by=sort_order)
968 .label("rn"),
969 )
970 .join(Race, RatingSnapshot.as_of_race_id == Race.id)
971 .join(Meeting, Race.meeting_id == Meeting.id)
972 .filter(RatingSnapshot.entity_type == entity_type)
973 )
975 if as_of_date is not None:
976 ranked = ranked.filter(Meeting.meeting_date <= as_of_date)
978 ranked = ranked.subquery()
980 results = (
981 session.query(RatingSnapshot)
982 .join(ranked, RatingSnapshot.id == ranked.c.snapshot_id)
983 .filter(ranked.c.rn == 1)
984 .order_by(desc(RatingSnapshot.rating))
985 .limit(limit)
986 .all()
987 )
989 # Cache the snapshot IDs for live queries (TTL: 60 seconds)
990 if as_of_date is None and results:
991 cache_key = f"top_ratings:{entity_type.value}:{limit}:live"
992 cache_set(cache_key, {"snapshot_ids": [r.id for r in results]}, ttl=60)
994 return results
997class BarrierAdjustmentRepository:
998 """Repository for barrier adjustment operations."""
1000 @staticmethod
1001 def upsert(
1002 session: Session,
1003 venue: str | None,
1004 start_type: str | None,
1005 distance_bucket: str,
1006 barrier: int,
1007 adjustment: float,
1008 sample_count: int = 1,
1009 ) -> BarrierAdjustment:
1010 """Upsert barrier adjustment.
1012 Args:
1013 session: Database session
1014 venue: Venue name or None for global
1015 start_type: Start type or None for any
1016 distance_bucket: Distance bucket string
1017 barrier: Barrier number
1018 adjustment: Adjustment value
1019 sample_count: Number of observations
1021 Returns:
1022 BarrierAdjustment instance
1023 """
1024 stmt = insert(BarrierAdjustment).values(
1025 venue=venue,
1026 start_type=start_type,
1027 distance_bucket=distance_bucket,
1028 barrier=barrier,
1029 adjustment=adjustment,
1030 sample_count=sample_count,
1031 )
1032 stmt = stmt.on_conflict_do_update(
1033 index_elements=["venue", "start_type", "distance_bucket", "barrier"],
1034 set_={
1035 "adjustment": stmt.excluded.adjustment,
1036 "sample_count": stmt.excluded.sample_count,
1037 "updated_at": stmt.excluded.updated_at,
1038 },
1039 )
1041 session.execute(stmt)
1042 session.flush()
1044 return (
1045 session.query(BarrierAdjustment)
1046 .filter(
1047 BarrierAdjustment.venue == venue,
1048 BarrierAdjustment.start_type == start_type,
1049 BarrierAdjustment.distance_bucket == distance_bucket,
1050 BarrierAdjustment.barrier == barrier,
1051 )
1052 .one()
1053 )
1055 @staticmethod
1056 def get_all(session: Session) -> list[BarrierAdjustment]:
1057 """Get all barrier adjustments.
1059 Args:
1060 session: Database session
1062 Returns:
1063 List of all barrier adjustments
1064 """
1065 return session.query(BarrierAdjustment).all()
1067 @staticmethod
1068 def increment_sample(
1069 session: Session,
1070 venue: str | None,
1071 start_type: str | None,
1072 distance_bucket: str,
1073 barrier: int,
1074 delta: float,
1075 learning_rate: float,
1076 ) -> None:
1077 """Incrementally update barrier adjustment.
1079 Args:
1080 session: Database session
1081 venue: Venue name or None
1082 start_type: Start type or None
1083 distance_bucket: Distance bucket
1084 barrier: Barrier number
1085 delta: Performance delta to incorporate
1086 learning_rate: Learning rate for update
1087 """
1088 existing = (
1089 session.query(BarrierAdjustment)
1090 .filter(
1091 BarrierAdjustment.venue == venue,
1092 BarrierAdjustment.start_type == start_type,
1093 BarrierAdjustment.distance_bucket == distance_bucket,
1094 BarrierAdjustment.barrier == barrier,
1095 )
1096 .first()
1097 )
1099 if existing:
1100 # Incremental update: adj_new = adj_old + lr * delta
1101 new_adjustment = existing.adjustment + learning_rate * delta
1102 new_count = existing.sample_count + 1
1104 existing.adjustment = new_adjustment
1105 existing.sample_count = new_count
1106 else:
1107 # Initialize with first observation
1108 BarrierAdjustmentRepository.upsert(
1109 session,
1110 venue=venue,
1111 start_type=start_type,
1112 distance_bucket=distance_bucket,
1113 barrier=barrier,
1114 adjustment=learning_rate * delta,
1115 sample_count=1,
1116 )
1119class HandicapAdjustmentRepository:
1120 """Repository for handicap adjustment operations."""
1122 @staticmethod
1123 def upsert(
1124 session: Session,
1125 venue: str | None,
1126 start_type: str | None,
1127 distance_bucket: str,
1128 handicap_m: int,
1129 adjustment: float,
1130 sample_count: int = 1,
1131 ) -> HandicapAdjustment:
1132 """Upsert handicap adjustment.
1134 Args:
1135 session: Database session
1136 venue: Venue name or None for global
1137 start_type: Start type or None for any
1138 distance_bucket: Distance bucket string
1139 handicap_m: Handicap in meters
1140 adjustment: Adjustment value
1141 sample_count: Number of observations
1143 Returns:
1144 HandicapAdjustment instance
1145 """
1146 stmt = insert(HandicapAdjustment).values(
1147 venue=venue,
1148 start_type=start_type,
1149 distance_bucket=distance_bucket,
1150 handicap_m=handicap_m,
1151 adjustment=adjustment,
1152 sample_count=sample_count,
1153 )
1154 stmt = stmt.on_conflict_do_update(
1155 index_elements=["venue", "start_type", "distance_bucket", "handicap_m"],
1156 set_={
1157 "adjustment": stmt.excluded.adjustment,
1158 "sample_count": stmt.excluded.sample_count,
1159 "updated_at": stmt.excluded.updated_at,
1160 },
1161 )
1163 session.execute(stmt)
1164 session.flush()
1166 return (
1167 session.query(HandicapAdjustment)
1168 .filter(
1169 HandicapAdjustment.venue == venue,
1170 HandicapAdjustment.start_type == start_type,
1171 HandicapAdjustment.distance_bucket == distance_bucket,
1172 HandicapAdjustment.handicap_m == handicap_m,
1173 )
1174 .one()
1175 )
1177 @staticmethod
1178 def get_all(session: Session) -> list[HandicapAdjustment]:
1179 """Get all handicap adjustments.
1181 Args:
1182 session: Database session
1184 Returns:
1185 List of all handicap adjustments
1186 """
1187 return session.query(HandicapAdjustment).all()
1189 @staticmethod
1190 def increment_sample(
1191 session: Session,
1192 venue: str | None,
1193 start_type: str | None,
1194 distance_bucket: str,
1195 handicap_m: int,
1196 delta: float,
1197 learning_rate: float,
1198 ) -> None:
1199 """Incrementally update handicap adjustment.
1201 Args:
1202 session: Database session
1203 venue: Venue name or None
1204 start_type: Start type or None
1205 distance_bucket: Distance bucket
1206 handicap_m: Handicap in meters
1207 delta: Performance delta to incorporate
1208 learning_rate: Learning rate for update
1209 """
1210 existing = (
1211 session.query(HandicapAdjustment)
1212 .filter(
1213 HandicapAdjustment.venue == venue,
1214 HandicapAdjustment.start_type == start_type,
1215 HandicapAdjustment.distance_bucket == distance_bucket,
1216 HandicapAdjustment.handicap_m == handicap_m,
1217 )
1218 .first()
1219 )
1221 if existing:
1222 # Incremental update: adj_new = adj_old + lr * delta
1223 new_adjustment = existing.adjustment + learning_rate * delta
1224 new_count = existing.sample_count + 1
1226 existing.adjustment = new_adjustment
1227 existing.sample_count = new_count
1228 else:
1229 # Initialize with first observation
1230 HandicapAdjustmentRepository.upsert(
1231 session,
1232 venue=venue,
1233 start_type=start_type,
1234 distance_bucket=distance_bucket,
1235 handicap_m=handicap_m,
1236 adjustment=learning_rate * delta,
1237 sample_count=1,
1238 )