Coverage for packages / core / common / scheduler.py: 49%

283 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 08:37 +1200

1"""Background job scheduler for automated tasks. 

2 

3Wraps APScheduler's AsyncIOScheduler with TipSharks-specific job types 

4for data ingestion, rating recomputation, and HRNZ scraping. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import smtplib 

11from datetime import date, timedelta 

12from email.message import EmailMessage 

13from typing import Any 

14 

15from apscheduler.schedulers.asyncio import AsyncIOScheduler 

16from apscheduler.triggers.cron import CronTrigger 

17 

18from packages.core.common.logging import get_logger 

19from packages.core.common.settings import get_settings 

20from packages.core.storage.database import get_session 

21 

22logger = get_logger(__name__) 

23 

24 

25class TipSharksScheduler: 

26 """Scheduler for TipSharks background jobs. 

27 

28 Wraps APScheduler's AsyncIOScheduler with convenience methods for 

29 adding ingest, recompute, and scrape jobs. 

30 

31 Each job executes the corresponding business logic function directly 

32 (no subprocess calls), using ``get_session()`` for database access. 

33 """ 

34 

35 def __init__(self) -> None: 

36 self._scheduler = AsyncIOScheduler() 

37 self._running = False 

38 global _scheduler_instance 

39 _scheduler_instance = self 

40 

41 # ── Lifecycle ───────────────────────────────────────────────────── 

42 

43 def start(self) -> None: 

44 """Start the scheduler. 

45 

46 Safe to call multiple times — only starts once. 

47 """ 

48 if self._running: 

49 logger.warning("Scheduler already running") 

50 return 

51 self._scheduler.start() 

52 self._running = True 

53 logger.info("TipSharksScheduler started") 

54 

55 def shutdown(self, wait: bool = True) -> None: 

56 """Shutdown the scheduler gracefully. 

57 

58 Args: 

59 wait: If True, wait for running jobs to finish. 

60 """ 

61 if not self._running: 

62 logger.warning("Scheduler not running") 

63 return 

64 self._scheduler.shutdown(wait=wait) 

65 self._running = False 

66 logger.info("TipSharksScheduler shut down") 

67 

68 @property 

69 def running(self) -> bool: 

70 """Whether the scheduler is currently running.""" 

71 return self._running 

72 

73 # ── Job management ──────────────────────────────────────────────── 

74 

75 def add_ingest_job( 

76 self, 

77 date_from: str | None = None, 

78 date_to: str | None = None, 

79 category: str = "H", 

80 source: str = "tab", 

81 cron_expr: str = "0 6 * * *", 

82 job_id: str = "ingest_daily", 

83 ) -> str: 

84 """Schedule a recurring data ingestion job. 

85 

86 Args: 

87 date_from: Start date (YYYY-MM-DD). Defaults to yesterday. 

88 date_to: End date (YYYY-MM-DD). Defaults to today. 

89 category: Racing category (T, H, G). 

90 source: Data source ("tab" or "ingest"). 

91 cron_expr: Cron schedule expression. 

92 job_id: Unique identifier for the job. 

93 

94 Returns: 

95 The job ID. 

96 """ 

97 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone()) 

98 

99 self._scheduler.add_job( 

100 _run_ingest, 

101 trigger=trigger, 

102 args=[date_from, date_to, category, source], 

103 id=job_id, 

104 replace_existing=True, 

105 name=f"Ingest ({category}) from {source}", 

106 ) 

107 logger.info( 

108 "Scheduled ingest job", 

109 extra={ 

110 "job_id": job_id, 

111 "cron": cron_expr, 

112 "category": category, 

113 "source": source, 

114 }, 

115 ) 

116 return job_id 

117 

118 def add_recompute_job( 

119 self, 

120 date_from: str | None = None, 

121 date_to: str | None = None, 

122 clear: bool = False, 

123 cron_expr: str = "0 2 * * 0", 

124 job_id: str = "recompute_weekly", 

125 ) -> str: 

126 """Schedule a recurring rating recomputation job. 

127 

128 Args: 

129 date_from: Start date (YYYY-MM-DD). Defaults to 90 days ago. 

130 date_to: End date (YYYY-MM-DD). Defaults to today. 

131 clear: If True, clear existing ratings before recompute. 

132 cron_expr: Cron schedule expression. 

133 job_id: Unique identifier for the job. 

134 

135 Returns: 

136 The job ID. 

137 """ 

138 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone()) 

139 

140 self._scheduler.add_job( 

141 _run_recompute, 

142 trigger=trigger, 

143 args=[date_from, date_to, clear], 

144 id=job_id, 

145 replace_existing=True, 

146 name=f"Recompute ({date_from or 'auto'} to {date_to or 'auto'})", 

147 ) 

148 logger.info( 

149 "Scheduled recompute job", 

150 extra={"job_id": job_id, "cron": cron_expr, "clear": clear}, 

151 ) 

152 return job_id 

153 

154 def add_scrape_job( 

155 self, 

156 urls: list[str] | None = None, 

157 club_codes: list[str] | None = None, 

158 date_from: str | None = None, 

159 date_to: str | None = None, 

160 cron_expr: str = "0 4 * * 0", 

161 job_id: str = "scrape_hrnz_weekly", 

162 ) -> str: 

163 """Schedule a recurring HRNZ scraping job. 

164 

165 Args: 

166 urls: Specific HRNZ result URLs to scrape. 

167 club_codes: HRNZ club codes to generate URLs for. 

168 date_from: Start date (YYYY-MM-DD). 

169 date_to: End date (YYYY-MM-DD). 

170 cron_expr: Cron schedule expression. 

171 job_id: Unique identifier for the job. 

172 

173 Returns: 

174 The job ID. 

175 """ 

176 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone()) 

177 

178 self._scheduler.add_job( 

179 _run_scrape, 

180 trigger=trigger, 

181 args=[urls, club_codes, date_from, date_to], 

182 id=job_id, 

183 replace_existing=True, 

184 name="HRNZ Scrape", 

185 ) 

186 logger.info( 

187 "Scheduled scrape job", 

188 extra={"job_id": job_id, "cron": cron_expr}, 

189 ) 

190 return job_id 

191 

192 def add_eval_job( 

193 self, 

194 date_from: str | None = None, 

195 date_to: str | None = None, 

196 cron_expr: str = "0 4 * * 0", 

197 job_id: str = "eval_weekly", 

198 ) -> str: 

199 """Schedule a recurring weekly evaluation job. 

200 

201 Args: 

202 date_from: Start date (YYYY-MM-DD). Defaults to 7 days ago. 

203 date_to: End date (YYYY-MM-DD). Defaults to today. 

204 cron_expr: Cron schedule expression. 

205 job_id: Unique identifier for the job. 

206 

207 Returns: 

208 The job ID. 

209 """ 

210 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone()) 

211 

212 self._scheduler.add_job( 

213 _run_eval, 

214 trigger=trigger, 

215 args=[date_from, date_to], 

216 id=job_id, 

217 replace_existing=True, 

218 name="Weekly Evaluation", 

219 ) 

220 logger.info( 

221 "Scheduled eval job", 

222 extra={"job_id": job_id, "cron": cron_expr}, 

223 ) 

224 return job_id 

225 

226 def add_full_recompute_job( 

227 self, 

228 date_from: str | None = None, 

229 date_to: str | None = None, 

230 clear: bool = True, 

231 cron_expr: str = "0 3 1 * *", 

232 job_id: str = "recompute_monthly_full", 

233 ) -> str: 

234 """Schedule a recurring monthly full recompute job. 

235 

236 Args: 

237 date_from: Start date (YYYY-MM-DD). Defaults to 365 days ago. 

238 date_to: End date (YYYY-MM-DD). Defaults to today. 

239 clear: If True, clear existing ratings before recompute. 

240 cron_expr: Cron schedule expression. 

241 job_id: Unique identifier for the job. 

242 

243 Returns: 

244 The job ID. 

245 """ 

246 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone()) 

247 

248 self._scheduler.add_job( 

249 _run_full_recompute, 

250 trigger=trigger, 

251 args=[date_from, date_to, clear], 

252 id=job_id, 

253 replace_existing=True, 

254 name="Monthly Full Recompute", 

255 ) 

256 logger.info( 

257 "Scheduled full recompute job", 

258 extra={"job_id": job_id, "cron": cron_expr, "clear": clear}, 

259 ) 

260 return job_id 

261 

262 def _send_failure_notification(self, job_id: str, error: str) -> None: 

263 """Send email notification for a scheduler job failure. 

264 

265 Only sends if ``email_notifications`` is enabled in settings. 

266 

267 Args: 

268 job_id: The failed job identifier. 

269 error: The error message from the job failure. 

270 """ 

271 settings = get_settings().scheduler 

272 if not settings.email_notifications: 

273 return 

274 if not settings.email_smtp_host or not settings.email_to: 

275 logger.warning( 

276 "Email notifications enabled but SMTP host or recipients not configured", 

277 extra={"job_id": job_id}, 

278 ) 

279 return 

280 

281 try: 

282 msg = EmailMessage() 

283 msg.set_content( 

284 f"Scheduled job '{job_id}' failed with error:\n\n{error}\n\n" 

285 f"Timestamp: {date.today().isoformat()}\n\n" 

286 "TipSharks Scheduler Failure Notification" 

287 ) 

288 msg["Subject"] = f"[TipSharks] Scheduler Failure: {job_id}" 

289 msg["From"] = settings.email_from or "scheduler@tipsharks.com" 

290 msg["To"] = settings.email_to 

291 

292 with smtplib.SMTP( 

293 settings.email_smtp_host, settings.email_smtp_port 

294 ) as server: 

295 if settings.email_username: 

296 server.starttls() 

297 server.login(settings.email_username, settings.email_password) 

298 server.send_message(msg) 

299 

300 logger.info( 

301 "Sent failure notification email", 

302 extra={"job_id": job_id, "recipients": settings.email_to}, 

303 ) 

304 except Exception as exc: 

305 logger.error( 

306 "Failed to send failure notification", 

307 extra={"job_id": job_id, "error": str(exc)}, 

308 ) 

309 

310 # ── Inspection and removal ─────────────────────────────────────── 

311 

312 def list_jobs(self) -> list[dict[str, Any]]: 

313 """List all scheduled jobs. 

314 

315 Returns: 

316 List of dicts with keys: id, name, next_run_time, trigger. 

317 """ 

318 jobs = [] 

319 for job in self._scheduler.get_jobs(): 

320 jobs.append( 

321 { 

322 "id": job.id, 

323 "name": job.name, 

324 "next_run_time": ( 

325 str(job.next_run_time) if job.next_run_time else None 

326 ), 

327 "trigger": str(job.trigger), 

328 } 

329 ) 

330 return jobs 

331 

332 def remove_job(self, job_id: str) -> bool: 

333 """Remove a scheduled job by ID. 

334 

335 Args: 

336 job_id: The job identifier. 

337 

338 Returns: 

339 True if the job was removed, False if not found. 

340 """ 

341 try: 

342 self._scheduler.remove_job(job_id) 

343 logger.info("Removed scheduled job", extra={"job_id": job_id}) 

344 return True 

345 except Exception: 

346 logger.warning("Job not found for removal", extra={"job_id": job_id}) 

347 return False 

348 

349 def get_job(self, job_id: str) -> dict[str, Any] | None: 

350 """Get a single job by ID. 

351 

352 Args: 

353 job_id: The job identifier. 

354 

355 Returns: 

356 Job info dict or None if not found. 

357 """ 

358 job = self._scheduler.get_job(job_id) 

359 if job is None: 

360 return None 

361 return { 

362 "id": job.id, 

363 "name": job.name, 

364 "next_run_time": str(job.next_run_time) if job.next_run_time else None, 

365 "trigger": str(job.trigger), 

366 } 

367 

368 # ── Internal helpers ───────────────────────────────────────────── 

369 

370 def _get_timezone(self) -> str: 

371 """Get the configured timezone string.""" 

372 return get_settings().scheduler.timezone 

373 

374 def load_default_jobs(self) -> list[str]: 

375 """Add default jobs from settings. 

376 

377 Reads scheduler config from settings and adds the standard 

378 ingest, recompute, eval, and full recompute jobs. 

379 

380 Returns: 

381 List of job IDs that were added. 

382 """ 

383 settings = get_settings().scheduler 

384 job_ids: list[str] = [] 

385 

386 if settings.ingest_cron: 

387 job_ids.append( 

388 self.add_ingest_job( 

389 cron_expr=settings.ingest_cron, 

390 job_id="ingest_daily", 

391 ) 

392 ) 

393 

394 if settings.recompute_cron: 

395 job_ids.append( 

396 self.add_recompute_job( 

397 cron_expr=settings.recompute_cron, 

398 job_id="recompute_weekly", 

399 ) 

400 ) 

401 

402 if settings.scrape_cron: 

403 job_ids.append( 

404 self.add_scrape_job( 

405 cron_expr=settings.scrape_cron, 

406 job_id="scrape_hrnz_weekly", 

407 ) 

408 ) 

409 

410 if settings.eval_cron: 

411 job_ids.append( 

412 self.add_eval_job( 

413 cron_expr=settings.eval_cron, 

414 job_id="eval_weekly", 

415 ) 

416 ) 

417 

418 if settings.full_recompute_cron: 

419 job_ids.append( 

420 self.add_full_recompute_job( 

421 cron_expr=settings.full_recompute_cron, 

422 job_id="recompute_monthly_full", 

423 ) 

424 ) 

425 

426 return job_ids 

427 

428 

429# ── Background job functions ───────────────────────────────────────── 

430 

431# Global scheduler instance for failure notifications 

432_scheduler_instance: TipSharksScheduler | None = None 

433 

434 

435def _notify_failure(job_id: str, error: str) -> None: 

436 """Send failure notification via the global scheduler instance.""" 

437 global _scheduler_instance 

438 if _scheduler_instance is not None: 

439 _scheduler_instance._send_failure_notification(job_id, error) 

440 

441 

442def _run_ingest( 

443 date_from: str | None = None, 

444 date_to: str | None = None, 

445 category: str = "H", 

446 source: str = "tab", 

447) -> dict[str, int]: 

448 """Execute an ingestion job. 

449 

450 Args: 

451 date_from: Start date (YYYY-MM-DD). Defaults to yesterday. 

452 date_to: End date (YYYY-MM-DD). Defaults to today. 

453 category: Racing category (T, H, G). 

454 source: Data source ("tab" or "ingest"). 

455 

456 Returns: 

457 Dict with ingestion stats. 

458 """ 

459 settings = get_settings() 

460 

461 if date_from: 

462 start_date = date.fromisoformat(date_from) 

463 else: 

464 start_date = date.today() - timedelta(days=settings.scheduler.ingest_days_back) 

465 

466 if date_to: 

467 end_date = date.fromisoformat(date_to) 

468 else: 

469 end_date = date.today() 

470 

471 logger.info( 

472 "Scheduled ingest starting", 

473 extra={ 

474 "date_from": str(start_date), 

475 "date_to": str(end_date), 

476 "category": category, 

477 "source": source, 

478 }, 

479 ) 

480 

481 try: 

482 from packages.core.storage.ingestion import IngestionService 

483 

484 with get_session() as session: 

485 service = IngestionService(session, source=source) 

486 meetings, races, starters = asyncio.run( 

487 service.ingest_date_range(start_date, end_date, category=category) 

488 ) 

489 

490 stats = { 

491 "meetings": meetings, 

492 "races": races, 

493 "starters": starters, 

494 "errors": service.stats["errors"], 

495 } 

496 logger.info( 

497 "Scheduled ingest complete", 

498 extra=stats, 

499 ) 

500 return stats 

501 except Exception as exc: 

502 logger.error( 

503 "Scheduled ingest failed", extra={"error": str(exc)}, exc_info=True 

504 ) 

505 _notify_failure("ingest_daily", str(exc)) 

506 return {"meetings": 0, "races": 0, "starters": 0, "errors": 1} 

507 

508 

509def _run_recompute( 

510 date_from: str | None = None, 

511 date_to: str | None = None, 

512 clear: bool = False, 

513) -> dict[str, int]: 

514 """Execute a recompute job. 

515 

516 Args: 

517 date_from: Start date (YYYY-MM-DD). Defaults to 90 days ago. 

518 date_to: End date (YYYY-MM-DD). Defaults to today. 

519 clear: If True, clear existing ratings. 

520 

521 Returns: 

522 Dict with recompute stats. 

523 """ 

524 settings = get_settings() 

525 

526 if date_from: 

527 start_date = date.fromisoformat(date_from) 

528 else: 

529 start_date = date.today() - timedelta( 

530 days=settings.scheduler.recompute_days_back 

531 ) 

532 

533 if date_to: 

534 end_date = date.fromisoformat(date_to) 

535 else: 

536 end_date = date.today() 

537 

538 logger.info( 

539 "Scheduled recompute starting", 

540 extra={"date_from": str(start_date), "date_to": str(end_date), "clear": clear}, 

541 ) 

542 

543 try: 

544 from packages.core.ratings.recompute import recompute_ratings 

545 

546 with get_session() as session: 

547 snapshots_created = recompute_ratings( 

548 session, 

549 start_date, 

550 end_date, 

551 clear_existing=clear, 

552 ) 

553 

554 stats = {"snapshots_created": snapshots_created} 

555 logger.info("Scheduled recompute complete", extra=stats) 

556 return stats 

557 except Exception as exc: 

558 logger.error( 

559 "Scheduled recompute failed", extra={"error": str(exc)}, exc_info=True 

560 ) 

561 _notify_failure("recompute_weekly", str(exc)) 

562 return {"snapshots_created": 0} 

563 

564 

565def _run_scrape( 

566 urls: list[str] | None = None, 

567 club_codes: list[str] | None = None, 

568 date_from: str | None = None, 

569 date_to: str | None = None, 

570) -> dict[str, int]: 

571 """Execute an HRNZ scraping job. 

572 

573 Args: 

574 urls: Specific HRNZ result URLs to scrape. 

575 club_codes: HRNZ club codes to generate URLs for. 

576 date_from: Start date (YYYY-MM-DD). 

577 date_to: End date (YYYY-MM-DD). 

578 

579 Returns: 

580 Dict with scraping stats. 

581 """ 

582 logger.info( 

583 "Scheduled scrape starting", 

584 extra={ 

585 "urls_count": len(urls) if urls else 0, 

586 "club_codes": club_codes, 

587 "date_from": date_from, 

588 "date_to": date_to, 

589 }, 

590 ) 

591 

592 try: 

593 from packages.core.storage.repositories import ( 

594 DriverRepository, 

595 HorseRepository, 

596 MeetingRepository, 

597 RaceRepository, 

598 StarterRepository, 

599 TrainerRepository, 

600 ) 

601 from packages.hrnz_scraper import HRNZScraper 

602 from packages.hrnz_scraper.mapper import HRNZDataMapper 

603 

604 start_date = date.fromisoformat(date_from) if date_from else date.today() 

605 end_date = date.fromisoformat(date_to) if date_to else date.today() 

606 

607 stats: dict[str, int] = { 

608 "meetings": 0, 

609 "races": 0, 

610 "starters": 0, 

611 "horses": 0, 

612 "drivers": 0, 

613 "trainers": 0, 

614 "errors": 0, 

615 } 

616 

617 mapper = HRNZDataMapper() 

618 

619 async def _scrape_all(): 

620 async with HRNZScraper() as scraper: 

621 target_urls: list[str] = urls or [] 

622 if club_codes: 

623 from apps.backend.api.main import _generate_hrnz_urls 

624 

625 generated = _generate_hrnz_urls(start_date, end_date, club_codes) 

626 target_urls.extend(generated) 

627 

628 if not target_urls: 

629 logger.warning("No URLs provided for scrape job") 

630 return stats 

631 

632 target_urls = list(dict.fromkeys(target_urls)) 

633 

634 for url in target_urls: 

635 try: 

636 scraped = await scraper.get_meeting_results(url) 

637 if not scraped.get("races"): 

638 continue 

639 

640 scraped_date = scraped.get("date") 

641 if scraped_date: 

642 try: 

643 meeting_date = date.fromisoformat(scraped_date) 

644 if not (start_date <= meeting_date <= end_date): 

645 continue 

646 except ValueError: 

647 pass 

648 

649 meeting = mapper.map_meeting(scraped) 

650 entities = mapper.map_entities(scraped) 

651 

652 with get_session() as session: 

653 MeetingRepository.upsert(session, meeting) 

654 stats["meetings"] += 1 

655 

656 for horse in entities.get("horses", []): 

657 HorseRepository.upsert( 

658 session, 

659 horse["id"], 

660 horse["name"], 

661 horse.get("raw_json"), 

662 ) 

663 stats["horses"] += 1 

664 

665 for driver in entities.get("drivers", []): 

666 DriverRepository.upsert( 

667 session, driver["name"], driver_id=driver.get("id") 

668 ) 

669 stats["drivers"] += 1 

670 

671 for trainer in entities.get("trainers", []): 

672 TrainerRepository.upsert( 

673 session, 

674 trainer["name"], 

675 trainer_id=trainer.get("id"), 

676 ) 

677 stats["trainers"] += 1 

678 

679 races = mapper.map_races(scraped, meeting["meeting"]) 

680 race_id_map = {} 

681 for race in races: 

682 race_obj = RaceRepository.upsert( 

683 session, meeting["meeting"], race 

684 ) 

685 race_id_map[race["race_number"]] = race_obj.id 

686 stats["races"] += 1 

687 

688 starters = mapper.map_starters(scraped, race_id_map) 

689 for starter in starters: 

690 StarterRepository.upsert( 

691 session, 

692 starter["race_id"], 

693 starter, 

694 starter.get("placing"), 

695 ) 

696 stats["starters"] += 1 

697 

698 session.commit() 

699 except Exception as exc: 

700 stats["errors"] += 1 

701 logger.error( 

702 "Failed to scrape URL in scheduled job", 

703 extra={"url": url, "error": str(exc)}, 

704 ) 

705 

706 return stats 

707 

708 stats = asyncio.run(_scrape_all()) 

709 

710 logger.info("Scheduled scrape complete", extra=stats) 

711 return stats 

712 except Exception as exc: 

713 logger.error( 

714 "Scheduled scrape failed", extra={"error": str(exc)}, exc_info=True 

715 ) 

716 _notify_failure("scrape_hrnz_weekly", str(exc)) 

717 return { 

718 "meetings": 0, 

719 "races": 0, 

720 "starters": 0, 

721 "horses": 0, 

722 "drivers": 0, 

723 "trainers": 0, 

724 "errors": 1, 

725 } 

726 

727 

728def _run_eval( 

729 date_from: str | None = None, 

730 date_to: str | None = None, 

731) -> dict[str, Any]: 

732 """Execute a weekly evaluation job. 

733 

734 Args: 

735 date_from: Start date (YYYY-MM-DD). Defaults to 7 days ago. 

736 date_to: End date (YYYY-MM-DD). Defaults to today. 

737 

738 Returns: 

739 Dict with evaluation stats. 

740 """ 

741 if date_from: 

742 start_date = date.fromisoformat(date_from) 

743 else: 

744 start_date = date.today() - timedelta(days=7) 

745 

746 if date_to: 

747 end_date = date.fromisoformat(date_to) 

748 else: 

749 end_date = date.today() 

750 

751 logger.info( 

752 "Scheduled evaluation starting", 

753 extra={"date_from": str(start_date), "date_to": str(end_date)}, 

754 ) 

755 

756 try: 

757 from packages.core.ratings.predictions import PredictionEngine 

758 from packages.core.storage.models import Race, Starter 

759 

760 with get_session() as session: 

761 engine = PredictionEngine(session) 

762 races = ( 

763 session.query(Race) 

764 .join(Starter, Race.id == Starter.race_id) 

765 .filter(Starter.placing.isnot(None)) 

766 .filter(Race.race_datetime >= start_date.isoformat()) 

767 .filter(Race.race_datetime <= end_date.isoformat()) 

768 .distinct() 

769 .all() 

770 ) 

771 

772 total_races = len(races) 

773 winner_correct = 0 

774 total_brier = 0.0 

775 

776 for race in races: 

777 try: 

778 comparison = engine.compare_prediction_to_actual(race.id) 

779 if comparison: 

780 if comparison.get("winner_correct"): 

781 winner_correct += 1 

782 total_brier += comparison.get("brier_score", 0.0) 

783 except Exception: 

784 continue 

785 

786 avg_brier = total_brier / total_races if total_races > 0 else 0.0 

787 win_accuracy = winner_correct / total_races if total_races > 0 else 0.0 

788 

789 stats = { 

790 "races_evaluated": total_races, 

791 "winner_accuracy": round(win_accuracy, 4), 

792 "avg_brier_score": round(avg_brier, 4), 

793 "date_from": str(start_date), 

794 "date_to": str(end_date), 

795 } 

796 logger.info("Scheduled evaluation complete", extra=stats) 

797 return stats 

798 except Exception as exc: 

799 logger.error( 

800 "Scheduled evaluation failed", extra={"error": str(exc)}, exc_info=True 

801 ) 

802 _notify_failure("eval_weekly", str(exc)) 

803 return {"races_evaluated": 0, "winner_accuracy": 0.0, "avg_brier_score": 0.0} 

804 

805 

806def _run_full_recompute( 

807 date_from: str | None = None, 

808 date_to: str | None = None, 

809 clear: bool = True, 

810) -> dict[str, int]: 

811 """Execute a monthly full recompute job. 

812 

813 Args: 

814 date_from: Start date (YYYY-MM-DD). Defaults to 365 days ago. 

815 date_to: End date (YYYY-MM-DD). Defaults to today. 

816 clear: If True, clear existing ratings before recompute. 

817 

818 Returns: 

819 Dict with recompute stats. 

820 """ 

821 if date_from: 

822 start_date = date.fromisoformat(date_from) 

823 else: 

824 start_date = date.today() - timedelta(days=365) 

825 

826 if date_to: 

827 end_date = date.fromisoformat(date_to) 

828 else: 

829 end_date = date.today() 

830 

831 logger.info( 

832 "Scheduled full recompute starting", 

833 extra={"date_from": str(start_date), "date_to": str(end_date), "clear": clear}, 

834 ) 

835 

836 try: 

837 from packages.core.ratings.recompute import recompute_ratings 

838 

839 with get_session() as session: 

840 snapshots_created = recompute_ratings( 

841 session, 

842 start_date, 

843 end_date, 

844 clear_existing=clear, 

845 ) 

846 

847 stats = {"snapshots_created": snapshots_created} 

848 logger.info("Scheduled full recompute complete", extra=stats) 

849 return stats 

850 except Exception as exc: 

851 logger.error( 

852 "Scheduled full recompute failed", extra={"error": str(exc)}, exc_info=True 

853 ) 

854 _notify_failure("recompute_monthly_full", str(exc)) 

855 return {"snapshots_created": 0}