Coverage for packages / core / storage / repositories.py: 24%

378 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 08:37 +1200

1"""Repository layer for database operations with idempotent upserts. 

2 

3Supports TAB Affiliates API data structures. 

4""" 

5 

6import copy 

7import hashlib 

8from datetime import UTC, date, datetime 

9from typing import Any 

10 

11from sqlalchemy import and_, desc, func, or_ 

12from sqlalchemy.dialects.postgresql import insert 

13from sqlalchemy.orm import Session, joinedload 

14 

15from packages.core.common.cache import cache_get, cache_set 

16from packages.core.common.logging import get_logger 

17from packages.core.storage.audit import AuditLogger 

18from packages.core.storage.models import ( 

19 BarrierAdjustment, 

20 Driver, 

21 EntityType, 

22 HandicapAdjustment, 

23 Horse, 

24 Meeting, 

25 Race, 

26 RatingSnapshot, 

27 Starter, 

28 Trainer, 

29) 

30 

31logger = get_logger(__name__) 

32 

33_HRNZ_ID_MODULUS = 2147483647 

34 

35 

36def normalize_entity_id(value: Any, fallback_name: str | None = None) -> int | None: 

37 """Normalize entity identifiers across TAB/HRNZ sources.""" 

38 if value is None and not fallback_name: 

39 return None 

40 

41 if isinstance(value, int): 

42 return value 

43 

44 if isinstance(value, str): 

45 stripped = value.strip() 

46 if stripped.isdigit(): 

47 return int(stripped) 

48 if stripped: 

49 return ( 

50 int(hashlib.md5(stripped.encode()).hexdigest()[:8], 16) 

51 % _HRNZ_ID_MODULUS 

52 ) 

53 

54 if fallback_name: 

55 return ( 

56 int(hashlib.md5(fallback_name.encode()).hexdigest()[:8], 16) 

57 % _HRNZ_ID_MODULUS 

58 ) 

59 

60 return None 

61 

62 

63def normalize_runner_data(runner_data: dict[str, Any]) -> dict[str, Any]: 

64 """Normalize runner payloads from TAB/HRNZ into a consistent shape.""" 

65 data = dict(runner_data or {}) 

66 

67 horse_name = ( 

68 data.get("name") or data.get("horse_name") or data.get("horse") or "Unknown" 

69 ) 

70 data["name"] = horse_name 

71 data["horse_name"] = horse_name 

72 

73 horse_id = data.get("horse_id") or data.get("horseId") or data.get("horse_uuid") 

74 data["horse_id"] = normalize_entity_id(horse_id, fallback_name=horse_name) 

75 

76 driver_name = data.get("jockey") or data.get("driver_name") or data.get("driver") 

77 if driver_name: 

78 data["driver_name"] = driver_name 

79 data["jockey"] = driver_name 

80 

81 trainer_name = data.get("trainer_name") or data.get("trainer") 

82 if trainer_name: 

83 data["trainer_name"] = trainer_name 

84 

85 driver_id = data.get("driver_id") or data.get("driverId") or data.get("driver_uuid") 

86 data["driver_id"] = normalize_entity_id(driver_id, fallback_name=driver_name) 

87 

88 trainer_id = ( 

89 data.get("trainer_id") or data.get("trainerId") or data.get("trainer_uuid") 

90 ) 

91 data["trainer_id"] = normalize_entity_id(trainer_id, fallback_name=trainer_name) 

92 

93 if "runner_number" not in data: 

94 data["runner_number"] = data.get("number") or data.get("runnerNo") 

95 

96 if "barrier" not in data: 

97 data["barrier"] = data.get("barrier_draw") 

98 

99 if "handicap_m" not in data: 

100 data["handicap_m"] = data.get("handicap") 

101 

102 if "handicap" not in data: 

103 data["handicap"] = data.get("handicap_m") 

104 

105 placing_value = data.get("placing") 

106 if isinstance(placing_value, str) and placing_value.strip().isdigit(): 

107 data["placing"] = int(placing_value.strip()) 

108 

109 for key in ( 

110 "barrier_position", 

111 "margin", 

112 "race_time", 

113 "placing", 

114 "barrier", 

115 "handicap_m", 

116 "runner_number", 

117 "driver_name", 

118 "trainer_name", 

119 ): 

120 data.setdefault(key, None) 

121 

122 return data 

123 

124 

125def normalize_race_data(race_data: dict[str, Any]) -> dict[str, Any]: 

126 """Normalize race payloads to ensure consistent nested structure.""" 

127 data = copy.deepcopy(race_data or {}) 

128 

129 if "distance_m" not in data and "distance" in data: 

130 data["distance_m"] = data.get("distance") 

131 

132 nested = data.get("raw_json") 

133 if isinstance(nested, dict): 

134 if "distance_m" not in nested and data.get("distance_m") is not None: 

135 nested["distance_m"] = data.get("distance_m") 

136 if "race_number" not in nested and data.get("race_number") is not None: 

137 nested["race_number"] = data.get("race_number") 

138 

139 starters = nested.get("starters") 

140 if isinstance(starters, list): 

141 nested["starters"] = [ 

142 normalize_runner_data(starter) if isinstance(starter, dict) else starter 

143 for starter in starters 

144 ] 

145 

146 data["raw_json"] = nested 

147 

148 return data 

149 

150 

151def generate_driver_id(name: str) -> int: 

152 """Generate deterministic integer ID from driver name. 

153 

154 Uses SHA256 hash truncated to 31 bits (positive integer). 

155 Collision risk: ~0.0001% at 100,000 drivers. 

156 

157 Args: 

158 name: Driver name 

159 

160 Returns: 

161 Positive integer ID 

162 """ 

163 if not name: 

164 raise ValueError("Cannot generate ID from empty name") 

165 normalized = name.strip().upper() 

166 hash_bytes = hashlib.sha256(normalized.encode("utf-8")).digest() 

167 return int.from_bytes(hash_bytes[:4], "big") & 0x7FFFFFFF 

168 

169 

170def generate_trainer_id(name: str) -> int: 

171 """Generate deterministic integer ID from trainer name. 

172 

173 Uses SHA256 hash truncated to 31 bits (positive integer). 

174 

175 Args: 

176 name: Trainer name 

177 

178 Returns: 

179 Positive integer ID 

180 """ 

181 if not name: 

182 raise ValueError("Cannot generate ID from empty name") 

183 normalized = name.strip().upper() 

184 hash_bytes = hashlib.sha256(normalized.encode("utf-8")).digest() 

185 return int.from_bytes(hash_bytes[:4], "big") & 0x7FFFFFFF 

186 

187 

188def parse_tab_datetime(dt_string: str | None) -> datetime | None: 

189 """Parse TAB API datetime string. 

190 

191 Args: 

192 dt_string: ISO datetime string (e.g., "2024-01-15T14:30:00+13:00") 

193 

194 Returns: 

195 datetime object or None if parsing fails 

196 """ 

197 if not dt_string: 

198 return None 

199 try: 

200 # Try ISO format with timezone 

201 return datetime.fromisoformat(dt_string.replace("Z", "+00:00")) 

202 except ValueError: 

203 try: 

204 # Try without timezone 

205 return datetime.fromisoformat(dt_string[:19]) 

206 except ValueError: 

207 logger.warning(f"Failed to parse datetime: {dt_string}") 

208 return None 

209 

210 

211def parse_tab_date(date_string: str | None) -> date | None: 

212 """Parse TAB API date from datetime string or date string. 

213 

214 Args: 

215 date_string: Date string (e.g., "2024-01-15" or "20240115" or full datetime) 

216 

217 Returns: 

218 date object or None if parsing fails 

219 """ 

220 if not date_string: 

221 return None 

222 try: 

223 # Try ISO date format YYYY-MM-DD 

224 if len(date_string) >= 10 and "-" in date_string: 

225 return date.fromisoformat(date_string[:10]) 

226 # Try YYYYMMDD format 

227 elif len(date_string) >= 8 and date_string[:8].isdigit(): 

228 return datetime.strptime(date_string[:8], "%Y%m%d").date() 

229 else: 

230 return None 

231 except ValueError: 

232 logger.warning(f"Failed to parse date: {date_string}") 

233 return None 

234 

235 

236class MeetingRepository: 

237 """Repository for meeting operations.""" 

238 

239 @staticmethod 

240 def upsert(session: Session, meeting_data: dict[str, Any]) -> Meeting: 

241 """Upsert meeting from TAB API data. 

242 

243 Args: 

244 session: Database session 

245 meeting_data: Raw meeting data from TAB API 

246 

247 Returns: 

248 Meeting instance 

249 """ 

250 # TAB API uses "meeting" as the ID field (string) 

251 meeting_id = meeting_data.get("meeting") 

252 if not meeting_id: 

253 raise ValueError("Meeting data missing 'meeting' ID") 

254 

255 # Parse meeting date from various formats 

256 meeting_date_raw = meeting_data.get("date") or meeting_data.get("meeting_date") 

257 meeting_date = parse_tab_date(meeting_date_raw) 

258 if not meeting_date: 

259 raise ValueError(f"Could not parse meeting date: {meeting_date_raw}") 

260 

261 # Use meeting name as venue 

262 venue = meeting_data.get("name", "Unknown") 

263 

264 # Category: T (Thoroughbred), H (Harness), G (Greyhound) 

265 category = meeting_data.get("category", "H") 

266 

267 # Upsert using PostgreSQL INSERT ... ON CONFLICT 

268 stmt = insert(Meeting).values( 

269 id=meeting_id, 

270 meeting_date=meeting_date, 

271 venue=venue, 

272 category=category, 

273 raw_json=meeting_data, 

274 ) 

275 stmt = stmt.on_conflict_do_update( 

276 index_elements=["id"], 

277 set_={ 

278 "meeting_date": stmt.excluded.meeting_date, 

279 "venue": stmt.excluded.venue, 

280 "category": stmt.excluded.category, 

281 "raw_json": stmt.excluded.raw_json, 

282 "updated_at": stmt.excluded.updated_at, 

283 }, 

284 ) 

285 

286 session.execute(stmt) 

287 session.flush() 

288 

289 meeting = session.query(Meeting).filter(Meeting.id == meeting_id).one() 

290 logger.debug( 

291 f"Upserted meeting {meeting_id} ({venue}, {meeting_date}, {category})" 

292 ) 

293 return meeting 

294 

295 @staticmethod 

296 def get_by_id(session: Session, meeting_id: str) -> Meeting | None: 

297 """Get meeting by ID. 

298 

299 Args: 

300 session: Database session 

301 meeting_id: Meeting ID (string) 

302 

303 Returns: 

304 Meeting or None 

305 """ 

306 return session.query(Meeting).filter(Meeting.id == meeting_id).first() 

307 

308 @staticmethod 

309 def get_by_date_range( 

310 session: Session, date_from: date, date_to: date 

311 ) -> list[Meeting]: 

312 """Get meetings in date range. 

313 

314 Args: 

315 session: Database session 

316 date_from: Start date (inclusive) 

317 date_to: End date (inclusive) 

318 

319 Returns: 

320 List of meetings 

321 """ 

322 return ( 

323 session.query(Meeting) 

324 .filter(Meeting.meeting_date >= date_from, Meeting.meeting_date <= date_to) 

325 .order_by(Meeting.meeting_date) 

326 .all() 

327 ) 

328 

329 

330class RaceRepository: 

331 """Repository for race operations.""" 

332 

333 @staticmethod 

334 def upsert(session: Session, meeting_id: str, race_data: dict[str, Any]) -> Race: 

335 """Upsert race from TAB API data. 

336 

337 Args: 

338 session: Database session 

339 meeting_id: Parent meeting ID (string) 

340 race_data: Raw race data from TAB API (from EventRaceDetails.race) 

341 

342 Returns: 

343 Race instance 

344 """ 

345 race_data = normalize_race_data(race_data) 

346 

347 race_number = race_data.get("race_number") 

348 if not race_number: 

349 raise ValueError("Race data missing race_number") 

350 

351 # TAB event ID (string) 

352 tab_event_id = race_data.get("event_id") 

353 

354 # TAB API returns distance as integer (metres) 

355 distance_m = race_data.get("distance") 

356 

357 # Start type (Mobile, Standing for harness) 

358 start_type = race_data.get("start_type") 

359 

360 # Gait (Pace, Trot for harness) 

361 gait = race_data.get("gait") 

362 

363 weather = race_data.get("weather") 

364 track_condition = race_data.get("track_condition") 

365 

366 # Parse race datetime from available fields 

367 race_datetime = RaceRepository._parse_race_datetime(race_data) 

368 

369 # Find existing race by composite key 

370 existing = ( 

371 session.query(Race) 

372 .filter(Race.meeting_id == meeting_id, Race.race_number == race_number) 

373 .first() 

374 ) 

375 

376 if existing: 

377 # Update existing 

378 existing.tab_event_id = tab_event_id 

379 existing.distance_m = distance_m 

380 existing.start_type = start_type 

381 existing.gait = gait 

382 existing.weather = weather 

383 existing.track_condition = track_condition 

384 existing.race_datetime = race_datetime 

385 existing.raw_json = race_data 

386 session.flush() 

387 logger.debug(f"Updated race {meeting_id}/{race_number}") 

388 return existing 

389 else: 

390 # Insert new 

391 race = Race( 

392 meeting_id=meeting_id, 

393 tab_event_id=tab_event_id, 

394 race_number=race_number, 

395 distance_m=distance_m, 

396 start_type=start_type, 

397 gait=gait, 

398 weather=weather, 

399 track_condition=track_condition, 

400 race_datetime=race_datetime, 

401 raw_json=race_data, 

402 ) 

403 session.add(race) 

404 session.flush() 

405 logger.debug( 

406 f"Inserted race {meeting_id}/{race_number} (event_id={tab_event_id})" 

407 ) 

408 return race 

409 

410 @staticmethod 

411 def _parse_race_datetime(race_data: dict[str, Any]) -> datetime | None: 

412 """Parse race datetime from known TAB/HRNZ fields.""" 

413 candidates = [ 

414 race_data.get("advertised_start_string"), 

415 race_data.get("race_datetime"), 

416 race_data.get("start_time"), 

417 race_data.get("start_datetime"), 

418 race_data.get("advertised_start_time"), 

419 ] 

420 raw_json = race_data.get("raw_json") 

421 if isinstance(raw_json, dict): 

422 candidates.append(raw_json.get("race_datetime")) 

423 

424 for value in candidates: 

425 if isinstance(value, datetime): 

426 return value 

427 if isinstance(value, str) and value.strip(): 

428 parsed = parse_tab_datetime(value) 

429 if parsed: 

430 return parsed 

431 

432 advertised_start = race_data.get("advertised_start") 

433 if isinstance(advertised_start, dict): 

434 for key in ("seconds", "secs", "time"): 

435 if key in advertised_start: 

436 try: 

437 timestamp = int(advertised_start[key]) 

438 except (TypeError, ValueError): 

439 continue 

440 return datetime.fromtimestamp(timestamp, tz=UTC) 

441 

442 return None 

443 

444 @staticmethod 

445 def get_by_meeting(session: Session, meeting_id: str) -> list[Race]: 

446 """Get all races for a meeting. 

447 

448 Args: 

449 session: Database session 

450 meeting_id: Meeting ID (string) 

451 

452 Returns: 

453 List of races 

454 """ 

455 return ( 

456 session.query(Race) 

457 .filter(Race.meeting_id == meeting_id) 

458 .order_by(Race.race_number) 

459 .all() 

460 ) 

461 

462 @staticmethod 

463 def get_races_for_recompute( 

464 session: Session, date_from: date, date_to: date 

465 ) -> list[Race]: 

466 """Get races in date range, ordered chronologically. 

467 

468 Args: 

469 session: Database session 

470 date_from: Start date 

471 date_to: End date 

472 

473 Returns: 

474 List of races ordered by datetime/meeting date 

475 """ 

476 return ( 

477 session.query(Race) 

478 .join(Meeting) 

479 .filter( 

480 Meeting.meeting_date >= date_from, 

481 Meeting.meeting_date <= date_to, 

482 ) 

483 .order_by(Meeting.meeting_date, Race.race_number) 

484 .all() 

485 ) 

486 

487 

488class HorseRepository: 

489 """Repository for horse operations.""" 

490 

491 @staticmethod 

492 def upsert( 

493 session: Session, 

494 horse_id: int, 

495 name: str, 

496 raw_data: dict[str, Any] | None = None, 

497 ) -> Horse: 

498 """Upsert horse from TAB runner data. 

499 

500 Args: 

501 session: Database session 

502 horse_id: TAB horse_id (integer) 

503 name: Horse name 

504 raw_data: Optional raw runner data for metadata 

505 

506 Returns: 

507 Horse instance 

508 """ 

509 if not horse_id: 

510 raise ValueError("Horse data missing ID") 

511 

512 stmt = insert(Horse).values( 

513 id=horse_id, 

514 name=name, 

515 raw_json=raw_data, 

516 ) 

517 stmt = stmt.on_conflict_do_update( 

518 index_elements=["id"], 

519 set_={ 

520 "name": stmt.excluded.name, 

521 "raw_json": stmt.excluded.raw_json, 

522 "updated_at": stmt.excluded.updated_at, 

523 }, 

524 ) 

525 

526 session.execute(stmt) 

527 session.flush() 

528 

529 return session.query(Horse).filter(Horse.id == horse_id).one() 

530 

531 

532class DriverRepository: 

533 """Repository for driver operations. 

534 

535 TAB API only provides driver names, not IDs. 

536 IDs are generated from name hash when no ID is provided. 

537 """ 

538 

539 @staticmethod 

540 def upsert(session: Session, name: str, driver_id: int | None = None) -> Driver: 

541 """Upsert driver by name. 

542 

543 Driver ID is generated from name hash when not provided since 

544 TAB API only provides names (in the "jockey" field for harness). 

545 

546 Args: 

547 session: Database session 

548 name: Driver name 

549 driver_id: Optional explicit ID from upstream sources 

550 

551 Returns: 

552 Driver instance 

553 """ 

554 if not name or not name.strip(): 

555 raise ValueError("Driver name is required") 

556 

557 if driver_id is None: 

558 driver_id = generate_driver_id(name) 

559 

560 stmt = insert(Driver).values( 

561 id=driver_id, 

562 name=name.strip(), 

563 raw_json={"name": name}, 

564 ) 

565 stmt = stmt.on_conflict_do_update( 

566 index_elements=["id"], 

567 set_={ 

568 "name": stmt.excluded.name, 

569 "raw_json": stmt.excluded.raw_json, 

570 "updated_at": stmt.excluded.updated_at, 

571 }, 

572 ) 

573 

574 session.execute(stmt) 

575 session.flush() 

576 

577 return session.query(Driver).filter(Driver.id == driver_id).one() 

578 

579 

580class TrainerRepository: 

581 """Repository for trainer operations. 

582 

583 TAB API only provides trainer names, not IDs. 

584 IDs are generated from name hash when no ID is provided. 

585 """ 

586 

587 @staticmethod 

588 def upsert(session: Session, name: str, trainer_id: int | None = None) -> Trainer: 

589 """Upsert trainer by name. 

590 

591 Trainer ID is generated from name hash when not provided since 

592 TAB API only provides names (in the "trainer_name" field). 

593 

594 Args: 

595 session: Database session 

596 name: Trainer name 

597 trainer_id: Optional explicit ID from upstream sources 

598 

599 Returns: 

600 Trainer instance 

601 """ 

602 if not name or not name.strip(): 

603 raise ValueError("Trainer name is required") 

604 

605 if trainer_id is None: 

606 trainer_id = generate_trainer_id(name) 

607 

608 stmt = insert(Trainer).values( 

609 id=trainer_id, 

610 name=name.strip(), 

611 raw_json={"name": name}, 

612 ) 

613 stmt = stmt.on_conflict_do_update( 

614 index_elements=["id"], 

615 set_={ 

616 "name": stmt.excluded.name, 

617 "raw_json": stmt.excluded.raw_json, 

618 "updated_at": stmt.excluded.updated_at, 

619 }, 

620 ) 

621 

622 session.execute(stmt) 

623 session.flush() 

624 

625 return session.query(Trainer).filter(Trainer.id == trainer_id).one() 

626 

627 

628class StarterRepository: 

629 """Repository for starter (runner) operations.""" 

630 

631 @staticmethod 

632 def upsert( 

633 session: Session, 

634 race_id: int, 

635 runner_data: dict[str, Any], 

636 placing: int | None = None, 

637 ) -> Starter | None: 

638 """Upsert starter/runner from TAB API data. 

639 

640 Args: 

641 session: Database session 

642 race_id: Parent race ID 

643 runner_data: Runner data from TAB API (EventRunner) 

644 placing: Optional placing from results (matched by entrant_id) 

645 

646 Returns: 

647 Starter instance or None if scratched 

648 """ 

649 runner_data = normalize_runner_data(runner_data) 

650 

651 # Skip scratched runners 

652 is_scratched = runner_data.get("is_scratched", False) 

653 if is_scratched: 

654 logger.debug(f"Skipping scratched runner: {runner_data.get('name')}") 

655 return None 

656 

657 # Extract horse data - TAB provides horse_id as integer 

658 horse_id = runner_data.get("horse_id") 

659 horse_name = runner_data.get("name", "Unknown") 

660 

661 if horse_id: 

662 HorseRepository.upsert(session, horse_id, horse_name, runner_data) 

663 

664 # Extract driver from "jockey" field (TAB uses jockey for all racing types) 

665 # For harness, this is actually the driver 

666 jockey_name = runner_data.get("jockey") or runner_data.get("driver_name") 

667 driver_id = None 

668 if jockey_name and jockey_name.strip(): 

669 driver = DriverRepository.upsert( 

670 session, jockey_name, driver_id=runner_data.get("driver_id") 

671 ) 

672 driver_id = driver.id 

673 elif runner_data.get("driver_id") is not None: 

674 driver_id = runner_data.get("driver_id") 

675 

676 # Extract trainer from "trainer_name" field 

677 trainer_name = runner_data.get("trainer_name") 

678 trainer_id = None 

679 if trainer_name and trainer_name.strip(): 

680 trainer = TrainerRepository.upsert( 

681 session, trainer_name, trainer_id=runner_data.get("trainer_id") 

682 ) 

683 trainer_id = trainer.id 

684 elif runner_data.get("trainer_id") is not None: 

685 trainer_id = runner_data.get("trainer_id") 

686 

687 # TAB provides barrier as integer directly 

688 barrier = runner_data.get("barrier") 

689 

690 # Harness-specific barrier position (e.g., "1F" for front row, "2B" for back) 

691 barrier_position = runner_data.get("barrier_position") 

692 

693 # Runner/saddlecloth number 

694 runner_number = runner_data.get("runner_number") 

695 

696 # Handicap in meters 

697 handicap_m = runner_data.get("handicap") 

698 if handicap_m is None: 

699 handicap_m = runner_data.get("handicap_m") 

700 if handicap_m: 

701 handicap_m = int(handicap_m) 

702 

703 # Placing is provided separately from results array 

704 did_not_finish = False 

705 if placing is None: 

706 # No result yet (upcoming race) or check if it's a DNF 

707 # TAB results have position as integer, non-finishers typically not in results 

708 placing = runner_data.get("placing") 

709 if isinstance(placing, str) and placing.strip().isdigit(): 

710 placing = int(placing.strip()) 

711 

712 # Check for existing starter (by race + horse) 

713 existing = None 

714 if horse_id: 

715 existing = ( 

716 session.query(Starter) 

717 .filter(Starter.race_id == race_id, Starter.horse_id == horse_id) 

718 .first() 

719 ) 

720 if existing is None: 

721 runner_number = runner_data.get("runner_number") 

722 if runner_number is not None: 

723 existing = ( 

724 session.query(Starter) 

725 .filter( 

726 Starter.race_id == race_id, 

727 Starter.runner_number == runner_number, 

728 ) 

729 .first() 

730 ) 

731 

732 if existing: 

733 # Track placing changes for audit logging 

734 old_placing = existing.placing 

735 placing_changed = ( 

736 placing is not None 

737 and old_placing is not None 

738 and placing != old_placing 

739 ) 

740 

741 # Update existing 

742 existing.driver_id = driver_id 

743 existing.trainer_id = trainer_id 

744 existing.runner_number = runner_number 

745 existing.barrier = barrier 

746 existing.barrier_position = barrier_position 

747 existing.handicap_m = handicap_m 

748 existing.placing = placing 

749 existing.did_not_finish = did_not_finish 

750 existing.raw_json = runner_data 

751 session.flush() 

752 

753 # Audit log placing correction if it changed 

754 if placing_changed: 

755 AuditLogger.log_change( 

756 session=session, 

757 table_name="starters", 

758 record_id=str(existing.id), 

759 action="CORRECT", 

760 old_values={"placing": old_placing}, 

761 new_values={"placing": placing}, 

762 changed_by="system", 

763 change_reason=( 

764 f"Placing corrected from {old_placing} to {placing} " 

765 f"for starter {existing.id} in race {race_id}" 

766 ), 

767 ) 

768 

769 return existing 

770 else: 

771 # Insert new 

772 starter = Starter( 

773 race_id=race_id, 

774 horse_id=horse_id, 

775 driver_id=driver_id, 

776 trainer_id=trainer_id, 

777 runner_number=runner_number, 

778 barrier=barrier, 

779 barrier_position=barrier_position, 

780 handicap_m=handicap_m, 

781 placing=placing, 

782 did_not_finish=did_not_finish, 

783 raw_json=runner_data, 

784 ) 

785 session.add(starter) 

786 session.flush() 

787 return starter 

788 

789 @staticmethod 

790 def get_by_race(session: Session, race_id: int) -> list[Starter]: 

791 """Get all starters for a race. 

792 

793 Args: 

794 session: Database session 

795 race_id: Race ID 

796 

797 Returns: 

798 List of starters 

799 """ 

800 return session.query(Starter).filter(Starter.race_id == race_id).all() 

801 

802 

803class RatingSnapshotRepository: 

804 """Repository for rating snapshot operations.""" 

805 

806 @staticmethod 

807 def upsert( 

808 session: Session, 

809 entity_type: EntityType, 

810 entity_id: int, 

811 as_of_race_id: int, 

812 rating: float, 

813 rd: float | None = None, 

814 meta: dict[str, Any] | None = None, 

815 ) -> RatingSnapshot: 

816 """Upsert rating snapshot. 

817 

818 Args: 

819 session: Database session 

820 entity_type: Type of entity (horse/driver/trainer) 

821 entity_id: Entity ID 

822 as_of_race_id: Race ID this rating is computed after 

823 rating: Elo rating value 

824 rd: Rating deviation (optional) 

825 meta: Additional metadata (optional) 

826 

827 Returns: 

828 RatingSnapshot instance 

829 """ 

830 stmt = insert(RatingSnapshot).values( 

831 entity_type=entity_type, 

832 entity_id=entity_id, 

833 as_of_race_id=as_of_race_id, 

834 rating=rating, 

835 rd=rd, 

836 meta=meta, 

837 ) 

838 stmt = stmt.on_conflict_do_update( 

839 index_elements=["entity_type", "entity_id", "as_of_race_id"], 

840 set_={ 

841 "rating": stmt.excluded.rating, 

842 "rd": stmt.excluded.rd, 

843 "meta": stmt.excluded.meta, 

844 }, 

845 ) 

846 

847 session.execute(stmt) 

848 session.flush() 

849 

850 return ( 

851 session.query(RatingSnapshot) 

852 .filter( 

853 RatingSnapshot.entity_type == entity_type, 

854 RatingSnapshot.entity_id == entity_id, 

855 RatingSnapshot.as_of_race_id == as_of_race_id, 

856 ) 

857 .one() 

858 ) 

859 

860 @staticmethod 

861 def get_latest_rating( 

862 session: Session, 

863 entity_type: EntityType, 

864 entity_id: int, 

865 before_race_id: int | None = None, 

866 ) -> RatingSnapshot | None: 

867 """Get latest rating for an entity. 

868 

869 Args: 

870 session: Database session 

871 entity_type: Type of entity 

872 entity_id: Entity ID 

873 before_race_id: Only consider ratings before this race (exclusive) 

874 

875 Returns: 

876 Latest RatingSnapshot or None 

877 """ 

878 query = ( 

879 session.query(RatingSnapshot) 

880 .join(Race, RatingSnapshot.as_of_race_id == Race.id) 

881 .join(Meeting, Race.meeting_id == Meeting.id) 

882 .filter( 

883 RatingSnapshot.entity_type == entity_type, 

884 RatingSnapshot.entity_id == entity_id, 

885 ) 

886 ) 

887 

888 if before_race_id is not None: 

889 target_race = ( 

890 session.query(Race) 

891 .options(joinedload(Race.meeting)) 

892 .filter(Race.id == before_race_id) 

893 .first() 

894 ) 

895 if target_race is None or target_race.meeting is None: 

896 return None 

897 

898 if target_race.race_datetime: 

899 query = query.filter(Race.race_datetime < target_race.race_datetime) 

900 else: 

901 query = query.filter( 

902 or_( 

903 Meeting.meeting_date < target_race.meeting.meeting_date, 

904 and_( 

905 Meeting.meeting_date == target_race.meeting.meeting_date, 

906 Race.meeting_id == target_race.meeting_id, 

907 Race.race_number < target_race.race_number, 

908 ), 

909 ) 

910 ) 

911 

912 return query.order_by( 

913 Meeting.meeting_date.desc(), 

914 Race.race_datetime.desc().nulls_last(), 

915 Race.race_number.desc(), 

916 Race.id.desc(), 

917 ).first() 

918 

919 @staticmethod 

920 def get_top_ratings( 

921 session: Session, 

922 entity_type: EntityType, 

923 limit: int = 100, 

924 as_of_date: date | None = None, 

925 ) -> list[RatingSnapshot]: 

926 """Get top-rated entities. 

927 

928 Args: 

929 session: Database session 

930 entity_type: Type of entity 

931 limit: Maximum results 

932 as_of_date: Only include ratings up to this date 

933 

934 Returns: 

935 List of latest rating snapshots, sorted by rating descending 

936 """ 

937 # Use cache when querying live data (no as_of_date filter) 

938 if as_of_date is None: 

939 cache_key = f"top_ratings:{entity_type.value}:{limit}:live" 

940 cached = cache_get(cache_key) 

941 if cached is not None: 

942 # Reconstruct RatingSnapshot objects from cached IDs 

943 ids = cached.get("snapshot_ids", []) 

944 if ids: 

945 # Fetch full objects from DB by ID (preserves ORM state) 

946 objects = ( 

947 session.query(RatingSnapshot) 

948 .filter(RatingSnapshot.id.in_(ids)) 

949 .order_by(desc(RatingSnapshot.rating)) 

950 .all() 

951 ) 

952 return objects 

953 return [] 

954 

955 sort_order = ( 

956 Meeting.meeting_date.desc(), 

957 Race.race_datetime.desc().nulls_last(), 

958 Race.race_number.desc(), 

959 Race.id.desc(), 

960 ) 

961 

962 ranked = ( 

963 session.query( 

964 RatingSnapshot.id.label("snapshot_id"), 

965 RatingSnapshot.entity_id, 

966 func.row_number() 

967 .over(partition_by=RatingSnapshot.entity_id, order_by=sort_order) 

968 .label("rn"), 

969 ) 

970 .join(Race, RatingSnapshot.as_of_race_id == Race.id) 

971 .join(Meeting, Race.meeting_id == Meeting.id) 

972 .filter(RatingSnapshot.entity_type == entity_type) 

973 ) 

974 

975 if as_of_date is not None: 

976 ranked = ranked.filter(Meeting.meeting_date <= as_of_date) 

977 

978 ranked = ranked.subquery() 

979 

980 results = ( 

981 session.query(RatingSnapshot) 

982 .join(ranked, RatingSnapshot.id == ranked.c.snapshot_id) 

983 .filter(ranked.c.rn == 1) 

984 .order_by(desc(RatingSnapshot.rating)) 

985 .limit(limit) 

986 .all() 

987 ) 

988 

989 # Cache the snapshot IDs for live queries (TTL: 60 seconds) 

990 if as_of_date is None and results: 

991 cache_key = f"top_ratings:{entity_type.value}:{limit}:live" 

992 cache_set(cache_key, {"snapshot_ids": [r.id for r in results]}, ttl=60) 

993 

994 return results 

995 

996 

997class BarrierAdjustmentRepository: 

998 """Repository for barrier adjustment operations.""" 

999 

1000 @staticmethod 

1001 def upsert( 

1002 session: Session, 

1003 venue: str | None, 

1004 start_type: str | None, 

1005 distance_bucket: str, 

1006 barrier: int, 

1007 adjustment: float, 

1008 sample_count: int = 1, 

1009 ) -> BarrierAdjustment: 

1010 """Upsert barrier adjustment. 

1011 

1012 Args: 

1013 session: Database session 

1014 venue: Venue name or None for global 

1015 start_type: Start type or None for any 

1016 distance_bucket: Distance bucket string 

1017 barrier: Barrier number 

1018 adjustment: Adjustment value 

1019 sample_count: Number of observations 

1020 

1021 Returns: 

1022 BarrierAdjustment instance 

1023 """ 

1024 stmt = insert(BarrierAdjustment).values( 

1025 venue=venue, 

1026 start_type=start_type, 

1027 distance_bucket=distance_bucket, 

1028 barrier=barrier, 

1029 adjustment=adjustment, 

1030 sample_count=sample_count, 

1031 ) 

1032 stmt = stmt.on_conflict_do_update( 

1033 index_elements=["venue", "start_type", "distance_bucket", "barrier"], 

1034 set_={ 

1035 "adjustment": stmt.excluded.adjustment, 

1036 "sample_count": stmt.excluded.sample_count, 

1037 "updated_at": stmt.excluded.updated_at, 

1038 }, 

1039 ) 

1040 

1041 session.execute(stmt) 

1042 session.flush() 

1043 

1044 return ( 

1045 session.query(BarrierAdjustment) 

1046 .filter( 

1047 BarrierAdjustment.venue == venue, 

1048 BarrierAdjustment.start_type == start_type, 

1049 BarrierAdjustment.distance_bucket == distance_bucket, 

1050 BarrierAdjustment.barrier == barrier, 

1051 ) 

1052 .one() 

1053 ) 

1054 

1055 @staticmethod 

1056 def get_all(session: Session) -> list[BarrierAdjustment]: 

1057 """Get all barrier adjustments. 

1058 

1059 Args: 

1060 session: Database session 

1061 

1062 Returns: 

1063 List of all barrier adjustments 

1064 """ 

1065 return session.query(BarrierAdjustment).all() 

1066 

1067 @staticmethod 

1068 def increment_sample( 

1069 session: Session, 

1070 venue: str | None, 

1071 start_type: str | None, 

1072 distance_bucket: str, 

1073 barrier: int, 

1074 delta: float, 

1075 learning_rate: float, 

1076 ) -> None: 

1077 """Incrementally update barrier adjustment. 

1078 

1079 Args: 

1080 session: Database session 

1081 venue: Venue name or None 

1082 start_type: Start type or None 

1083 distance_bucket: Distance bucket 

1084 barrier: Barrier number 

1085 delta: Performance delta to incorporate 

1086 learning_rate: Learning rate for update 

1087 """ 

1088 existing = ( 

1089 session.query(BarrierAdjustment) 

1090 .filter( 

1091 BarrierAdjustment.venue == venue, 

1092 BarrierAdjustment.start_type == start_type, 

1093 BarrierAdjustment.distance_bucket == distance_bucket, 

1094 BarrierAdjustment.barrier == barrier, 

1095 ) 

1096 .first() 

1097 ) 

1098 

1099 if existing: 

1100 # Incremental update: adj_new = adj_old + lr * delta 

1101 new_adjustment = existing.adjustment + learning_rate * delta 

1102 new_count = existing.sample_count + 1 

1103 

1104 existing.adjustment = new_adjustment 

1105 existing.sample_count = new_count 

1106 else: 

1107 # Initialize with first observation 

1108 BarrierAdjustmentRepository.upsert( 

1109 session, 

1110 venue=venue, 

1111 start_type=start_type, 

1112 distance_bucket=distance_bucket, 

1113 barrier=barrier, 

1114 adjustment=learning_rate * delta, 

1115 sample_count=1, 

1116 ) 

1117 

1118 

1119class HandicapAdjustmentRepository: 

1120 """Repository for handicap adjustment operations.""" 

1121 

1122 @staticmethod 

1123 def upsert( 

1124 session: Session, 

1125 venue: str | None, 

1126 start_type: str | None, 

1127 distance_bucket: str, 

1128 handicap_m: int, 

1129 adjustment: float, 

1130 sample_count: int = 1, 

1131 ) -> HandicapAdjustment: 

1132 """Upsert handicap adjustment. 

1133 

1134 Args: 

1135 session: Database session 

1136 venue: Venue name or None for global 

1137 start_type: Start type or None for any 

1138 distance_bucket: Distance bucket string 

1139 handicap_m: Handicap in meters 

1140 adjustment: Adjustment value 

1141 sample_count: Number of observations 

1142 

1143 Returns: 

1144 HandicapAdjustment instance 

1145 """ 

1146 stmt = insert(HandicapAdjustment).values( 

1147 venue=venue, 

1148 start_type=start_type, 

1149 distance_bucket=distance_bucket, 

1150 handicap_m=handicap_m, 

1151 adjustment=adjustment, 

1152 sample_count=sample_count, 

1153 ) 

1154 stmt = stmt.on_conflict_do_update( 

1155 index_elements=["venue", "start_type", "distance_bucket", "handicap_m"], 

1156 set_={ 

1157 "adjustment": stmt.excluded.adjustment, 

1158 "sample_count": stmt.excluded.sample_count, 

1159 "updated_at": stmt.excluded.updated_at, 

1160 }, 

1161 ) 

1162 

1163 session.execute(stmt) 

1164 session.flush() 

1165 

1166 return ( 

1167 session.query(HandicapAdjustment) 

1168 .filter( 

1169 HandicapAdjustment.venue == venue, 

1170 HandicapAdjustment.start_type == start_type, 

1171 HandicapAdjustment.distance_bucket == distance_bucket, 

1172 HandicapAdjustment.handicap_m == handicap_m, 

1173 ) 

1174 .one() 

1175 ) 

1176 

1177 @staticmethod 

1178 def get_all(session: Session) -> list[HandicapAdjustment]: 

1179 """Get all handicap adjustments. 

1180 

1181 Args: 

1182 session: Database session 

1183 

1184 Returns: 

1185 List of all handicap adjustments 

1186 """ 

1187 return session.query(HandicapAdjustment).all() 

1188 

1189 @staticmethod 

1190 def increment_sample( 

1191 session: Session, 

1192 venue: str | None, 

1193 start_type: str | None, 

1194 distance_bucket: str, 

1195 handicap_m: int, 

1196 delta: float, 

1197 learning_rate: float, 

1198 ) -> None: 

1199 """Incrementally update handicap adjustment. 

1200 

1201 Args: 

1202 session: Database session 

1203 venue: Venue name or None 

1204 start_type: Start type or None 

1205 distance_bucket: Distance bucket 

1206 handicap_m: Handicap in meters 

1207 delta: Performance delta to incorporate 

1208 learning_rate: Learning rate for update 

1209 """ 

1210 existing = ( 

1211 session.query(HandicapAdjustment) 

1212 .filter( 

1213 HandicapAdjustment.venue == venue, 

1214 HandicapAdjustment.start_type == start_type, 

1215 HandicapAdjustment.distance_bucket == distance_bucket, 

1216 HandicapAdjustment.handicap_m == handicap_m, 

1217 ) 

1218 .first() 

1219 ) 

1220 

1221 if existing: 

1222 # Incremental update: adj_new = adj_old + lr * delta 

1223 new_adjustment = existing.adjustment + learning_rate * delta 

1224 new_count = existing.sample_count + 1 

1225 

1226 existing.adjustment = new_adjustment 

1227 existing.sample_count = new_count 

1228 else: 

1229 # Initialize with first observation 

1230 HandicapAdjustmentRepository.upsert( 

1231 session, 

1232 venue=venue, 

1233 start_type=start_type, 

1234 distance_bucket=distance_bucket, 

1235 handicap_m=handicap_m, 

1236 adjustment=learning_rate * delta, 

1237 sample_count=1, 

1238 )