Coverage for packages / core / common / scheduler.py: 49%
283 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
1"""Background job scheduler for automated tasks.
3Wraps APScheduler's AsyncIOScheduler with TipSharks-specific job types
4for data ingestion, rating recomputation, and HRNZ scraping.
5"""
7from __future__ import annotations
9import asyncio
10import smtplib
11from datetime import date, timedelta
12from email.message import EmailMessage
13from typing import Any
15from apscheduler.schedulers.asyncio import AsyncIOScheduler
16from apscheduler.triggers.cron import CronTrigger
18from packages.core.common.logging import get_logger
19from packages.core.common.settings import get_settings
20from packages.core.storage.database import get_session
22logger = get_logger(__name__)
25class TipSharksScheduler:
26 """Scheduler for TipSharks background jobs.
28 Wraps APScheduler's AsyncIOScheduler with convenience methods for
29 adding ingest, recompute, and scrape jobs.
31 Each job executes the corresponding business logic function directly
32 (no subprocess calls), using ``get_session()`` for database access.
33 """
35 def __init__(self) -> None:
36 self._scheduler = AsyncIOScheduler()
37 self._running = False
38 global _scheduler_instance
39 _scheduler_instance = self
41 # ── Lifecycle ─────────────────────────────────────────────────────
43 def start(self) -> None:
44 """Start the scheduler.
46 Safe to call multiple times — only starts once.
47 """
48 if self._running:
49 logger.warning("Scheduler already running")
50 return
51 self._scheduler.start()
52 self._running = True
53 logger.info("TipSharksScheduler started")
55 def shutdown(self, wait: bool = True) -> None:
56 """Shutdown the scheduler gracefully.
58 Args:
59 wait: If True, wait for running jobs to finish.
60 """
61 if not self._running:
62 logger.warning("Scheduler not running")
63 return
64 self._scheduler.shutdown(wait=wait)
65 self._running = False
66 logger.info("TipSharksScheduler shut down")
68 @property
69 def running(self) -> bool:
70 """Whether the scheduler is currently running."""
71 return self._running
73 # ── Job management ────────────────────────────────────────────────
75 def add_ingest_job(
76 self,
77 date_from: str | None = None,
78 date_to: str | None = None,
79 category: str = "H",
80 source: str = "tab",
81 cron_expr: str = "0 6 * * *",
82 job_id: str = "ingest_daily",
83 ) -> str:
84 """Schedule a recurring data ingestion job.
86 Args:
87 date_from: Start date (YYYY-MM-DD). Defaults to yesterday.
88 date_to: End date (YYYY-MM-DD). Defaults to today.
89 category: Racing category (T, H, G).
90 source: Data source ("tab" or "ingest").
91 cron_expr: Cron schedule expression.
92 job_id: Unique identifier for the job.
94 Returns:
95 The job ID.
96 """
97 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone())
99 self._scheduler.add_job(
100 _run_ingest,
101 trigger=trigger,
102 args=[date_from, date_to, category, source],
103 id=job_id,
104 replace_existing=True,
105 name=f"Ingest ({category}) from {source}",
106 )
107 logger.info(
108 "Scheduled ingest job",
109 extra={
110 "job_id": job_id,
111 "cron": cron_expr,
112 "category": category,
113 "source": source,
114 },
115 )
116 return job_id
118 def add_recompute_job(
119 self,
120 date_from: str | None = None,
121 date_to: str | None = None,
122 clear: bool = False,
123 cron_expr: str = "0 2 * * 0",
124 job_id: str = "recompute_weekly",
125 ) -> str:
126 """Schedule a recurring rating recomputation job.
128 Args:
129 date_from: Start date (YYYY-MM-DD). Defaults to 90 days ago.
130 date_to: End date (YYYY-MM-DD). Defaults to today.
131 clear: If True, clear existing ratings before recompute.
132 cron_expr: Cron schedule expression.
133 job_id: Unique identifier for the job.
135 Returns:
136 The job ID.
137 """
138 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone())
140 self._scheduler.add_job(
141 _run_recompute,
142 trigger=trigger,
143 args=[date_from, date_to, clear],
144 id=job_id,
145 replace_existing=True,
146 name=f"Recompute ({date_from or 'auto'} to {date_to or 'auto'})",
147 )
148 logger.info(
149 "Scheduled recompute job",
150 extra={"job_id": job_id, "cron": cron_expr, "clear": clear},
151 )
152 return job_id
154 def add_scrape_job(
155 self,
156 urls: list[str] | None = None,
157 club_codes: list[str] | None = None,
158 date_from: str | None = None,
159 date_to: str | None = None,
160 cron_expr: str = "0 4 * * 0",
161 job_id: str = "scrape_hrnz_weekly",
162 ) -> str:
163 """Schedule a recurring HRNZ scraping job.
165 Args:
166 urls: Specific HRNZ result URLs to scrape.
167 club_codes: HRNZ club codes to generate URLs for.
168 date_from: Start date (YYYY-MM-DD).
169 date_to: End date (YYYY-MM-DD).
170 cron_expr: Cron schedule expression.
171 job_id: Unique identifier for the job.
173 Returns:
174 The job ID.
175 """
176 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone())
178 self._scheduler.add_job(
179 _run_scrape,
180 trigger=trigger,
181 args=[urls, club_codes, date_from, date_to],
182 id=job_id,
183 replace_existing=True,
184 name="HRNZ Scrape",
185 )
186 logger.info(
187 "Scheduled scrape job",
188 extra={"job_id": job_id, "cron": cron_expr},
189 )
190 return job_id
192 def add_eval_job(
193 self,
194 date_from: str | None = None,
195 date_to: str | None = None,
196 cron_expr: str = "0 4 * * 0",
197 job_id: str = "eval_weekly",
198 ) -> str:
199 """Schedule a recurring weekly evaluation job.
201 Args:
202 date_from: Start date (YYYY-MM-DD). Defaults to 7 days ago.
203 date_to: End date (YYYY-MM-DD). Defaults to today.
204 cron_expr: Cron schedule expression.
205 job_id: Unique identifier for the job.
207 Returns:
208 The job ID.
209 """
210 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone())
212 self._scheduler.add_job(
213 _run_eval,
214 trigger=trigger,
215 args=[date_from, date_to],
216 id=job_id,
217 replace_existing=True,
218 name="Weekly Evaluation",
219 )
220 logger.info(
221 "Scheduled eval job",
222 extra={"job_id": job_id, "cron": cron_expr},
223 )
224 return job_id
226 def add_full_recompute_job(
227 self,
228 date_from: str | None = None,
229 date_to: str | None = None,
230 clear: bool = True,
231 cron_expr: str = "0 3 1 * *",
232 job_id: str = "recompute_monthly_full",
233 ) -> str:
234 """Schedule a recurring monthly full recompute job.
236 Args:
237 date_from: Start date (YYYY-MM-DD). Defaults to 365 days ago.
238 date_to: End date (YYYY-MM-DD). Defaults to today.
239 clear: If True, clear existing ratings before recompute.
240 cron_expr: Cron schedule expression.
241 job_id: Unique identifier for the job.
243 Returns:
244 The job ID.
245 """
246 trigger = CronTrigger.from_crontab(cron_expr, timezone=self._get_timezone())
248 self._scheduler.add_job(
249 _run_full_recompute,
250 trigger=trigger,
251 args=[date_from, date_to, clear],
252 id=job_id,
253 replace_existing=True,
254 name="Monthly Full Recompute",
255 )
256 logger.info(
257 "Scheduled full recompute job",
258 extra={"job_id": job_id, "cron": cron_expr, "clear": clear},
259 )
260 return job_id
262 def _send_failure_notification(self, job_id: str, error: str) -> None:
263 """Send email notification for a scheduler job failure.
265 Only sends if ``email_notifications`` is enabled in settings.
267 Args:
268 job_id: The failed job identifier.
269 error: The error message from the job failure.
270 """
271 settings = get_settings().scheduler
272 if not settings.email_notifications:
273 return
274 if not settings.email_smtp_host or not settings.email_to:
275 logger.warning(
276 "Email notifications enabled but SMTP host or recipients not configured",
277 extra={"job_id": job_id},
278 )
279 return
281 try:
282 msg = EmailMessage()
283 msg.set_content(
284 f"Scheduled job '{job_id}' failed with error:\n\n{error}\n\n"
285 f"Timestamp: {date.today().isoformat()}\n\n"
286 "TipSharks Scheduler Failure Notification"
287 )
288 msg["Subject"] = f"[TipSharks] Scheduler Failure: {job_id}"
289 msg["From"] = settings.email_from or "scheduler@tipsharks.com"
290 msg["To"] = settings.email_to
292 with smtplib.SMTP(
293 settings.email_smtp_host, settings.email_smtp_port
294 ) as server:
295 if settings.email_username:
296 server.starttls()
297 server.login(settings.email_username, settings.email_password)
298 server.send_message(msg)
300 logger.info(
301 "Sent failure notification email",
302 extra={"job_id": job_id, "recipients": settings.email_to},
303 )
304 except Exception as exc:
305 logger.error(
306 "Failed to send failure notification",
307 extra={"job_id": job_id, "error": str(exc)},
308 )
310 # ── Inspection and removal ───────────────────────────────────────
312 def list_jobs(self) -> list[dict[str, Any]]:
313 """List all scheduled jobs.
315 Returns:
316 List of dicts with keys: id, name, next_run_time, trigger.
317 """
318 jobs = []
319 for job in self._scheduler.get_jobs():
320 jobs.append(
321 {
322 "id": job.id,
323 "name": job.name,
324 "next_run_time": (
325 str(job.next_run_time) if job.next_run_time else None
326 ),
327 "trigger": str(job.trigger),
328 }
329 )
330 return jobs
332 def remove_job(self, job_id: str) -> bool:
333 """Remove a scheduled job by ID.
335 Args:
336 job_id: The job identifier.
338 Returns:
339 True if the job was removed, False if not found.
340 """
341 try:
342 self._scheduler.remove_job(job_id)
343 logger.info("Removed scheduled job", extra={"job_id": job_id})
344 return True
345 except Exception:
346 logger.warning("Job not found for removal", extra={"job_id": job_id})
347 return False
349 def get_job(self, job_id: str) -> dict[str, Any] | None:
350 """Get a single job by ID.
352 Args:
353 job_id: The job identifier.
355 Returns:
356 Job info dict or None if not found.
357 """
358 job = self._scheduler.get_job(job_id)
359 if job is None:
360 return None
361 return {
362 "id": job.id,
363 "name": job.name,
364 "next_run_time": str(job.next_run_time) if job.next_run_time else None,
365 "trigger": str(job.trigger),
366 }
368 # ── Internal helpers ─────────────────────────────────────────────
370 def _get_timezone(self) -> str:
371 """Get the configured timezone string."""
372 return get_settings().scheduler.timezone
374 def load_default_jobs(self) -> list[str]:
375 """Add default jobs from settings.
377 Reads scheduler config from settings and adds the standard
378 ingest, recompute, eval, and full recompute jobs.
380 Returns:
381 List of job IDs that were added.
382 """
383 settings = get_settings().scheduler
384 job_ids: list[str] = []
386 if settings.ingest_cron:
387 job_ids.append(
388 self.add_ingest_job(
389 cron_expr=settings.ingest_cron,
390 job_id="ingest_daily",
391 )
392 )
394 if settings.recompute_cron:
395 job_ids.append(
396 self.add_recompute_job(
397 cron_expr=settings.recompute_cron,
398 job_id="recompute_weekly",
399 )
400 )
402 if settings.scrape_cron:
403 job_ids.append(
404 self.add_scrape_job(
405 cron_expr=settings.scrape_cron,
406 job_id="scrape_hrnz_weekly",
407 )
408 )
410 if settings.eval_cron:
411 job_ids.append(
412 self.add_eval_job(
413 cron_expr=settings.eval_cron,
414 job_id="eval_weekly",
415 )
416 )
418 if settings.full_recompute_cron:
419 job_ids.append(
420 self.add_full_recompute_job(
421 cron_expr=settings.full_recompute_cron,
422 job_id="recompute_monthly_full",
423 )
424 )
426 return job_ids
429# ── Background job functions ─────────────────────────────────────────
431# Global scheduler instance for failure notifications
432_scheduler_instance: TipSharksScheduler | None = None
435def _notify_failure(job_id: str, error: str) -> None:
436 """Send failure notification via the global scheduler instance."""
437 global _scheduler_instance
438 if _scheduler_instance is not None:
439 _scheduler_instance._send_failure_notification(job_id, error)
442def _run_ingest(
443 date_from: str | None = None,
444 date_to: str | None = None,
445 category: str = "H",
446 source: str = "tab",
447) -> dict[str, int]:
448 """Execute an ingestion job.
450 Args:
451 date_from: Start date (YYYY-MM-DD). Defaults to yesterday.
452 date_to: End date (YYYY-MM-DD). Defaults to today.
453 category: Racing category (T, H, G).
454 source: Data source ("tab" or "ingest").
456 Returns:
457 Dict with ingestion stats.
458 """
459 settings = get_settings()
461 if date_from:
462 start_date = date.fromisoformat(date_from)
463 else:
464 start_date = date.today() - timedelta(days=settings.scheduler.ingest_days_back)
466 if date_to:
467 end_date = date.fromisoformat(date_to)
468 else:
469 end_date = date.today()
471 logger.info(
472 "Scheduled ingest starting",
473 extra={
474 "date_from": str(start_date),
475 "date_to": str(end_date),
476 "category": category,
477 "source": source,
478 },
479 )
481 try:
482 from packages.core.storage.ingestion import IngestionService
484 with get_session() as session:
485 service = IngestionService(session, source=source)
486 meetings, races, starters = asyncio.run(
487 service.ingest_date_range(start_date, end_date, category=category)
488 )
490 stats = {
491 "meetings": meetings,
492 "races": races,
493 "starters": starters,
494 "errors": service.stats["errors"],
495 }
496 logger.info(
497 "Scheduled ingest complete",
498 extra=stats,
499 )
500 return stats
501 except Exception as exc:
502 logger.error(
503 "Scheduled ingest failed", extra={"error": str(exc)}, exc_info=True
504 )
505 _notify_failure("ingest_daily", str(exc))
506 return {"meetings": 0, "races": 0, "starters": 0, "errors": 1}
509def _run_recompute(
510 date_from: str | None = None,
511 date_to: str | None = None,
512 clear: bool = False,
513) -> dict[str, int]:
514 """Execute a recompute job.
516 Args:
517 date_from: Start date (YYYY-MM-DD). Defaults to 90 days ago.
518 date_to: End date (YYYY-MM-DD). Defaults to today.
519 clear: If True, clear existing ratings.
521 Returns:
522 Dict with recompute stats.
523 """
524 settings = get_settings()
526 if date_from:
527 start_date = date.fromisoformat(date_from)
528 else:
529 start_date = date.today() - timedelta(
530 days=settings.scheduler.recompute_days_back
531 )
533 if date_to:
534 end_date = date.fromisoformat(date_to)
535 else:
536 end_date = date.today()
538 logger.info(
539 "Scheduled recompute starting",
540 extra={"date_from": str(start_date), "date_to": str(end_date), "clear": clear},
541 )
543 try:
544 from packages.core.ratings.recompute import recompute_ratings
546 with get_session() as session:
547 snapshots_created = recompute_ratings(
548 session,
549 start_date,
550 end_date,
551 clear_existing=clear,
552 )
554 stats = {"snapshots_created": snapshots_created}
555 logger.info("Scheduled recompute complete", extra=stats)
556 return stats
557 except Exception as exc:
558 logger.error(
559 "Scheduled recompute failed", extra={"error": str(exc)}, exc_info=True
560 )
561 _notify_failure("recompute_weekly", str(exc))
562 return {"snapshots_created": 0}
565def _run_scrape(
566 urls: list[str] | None = None,
567 club_codes: list[str] | None = None,
568 date_from: str | None = None,
569 date_to: str | None = None,
570) -> dict[str, int]:
571 """Execute an HRNZ scraping job.
573 Args:
574 urls: Specific HRNZ result URLs to scrape.
575 club_codes: HRNZ club codes to generate URLs for.
576 date_from: Start date (YYYY-MM-DD).
577 date_to: End date (YYYY-MM-DD).
579 Returns:
580 Dict with scraping stats.
581 """
582 logger.info(
583 "Scheduled scrape starting",
584 extra={
585 "urls_count": len(urls) if urls else 0,
586 "club_codes": club_codes,
587 "date_from": date_from,
588 "date_to": date_to,
589 },
590 )
592 try:
593 from packages.core.storage.repositories import (
594 DriverRepository,
595 HorseRepository,
596 MeetingRepository,
597 RaceRepository,
598 StarterRepository,
599 TrainerRepository,
600 )
601 from packages.hrnz_scraper import HRNZScraper
602 from packages.hrnz_scraper.mapper import HRNZDataMapper
604 start_date = date.fromisoformat(date_from) if date_from else date.today()
605 end_date = date.fromisoformat(date_to) if date_to else date.today()
607 stats: dict[str, int] = {
608 "meetings": 0,
609 "races": 0,
610 "starters": 0,
611 "horses": 0,
612 "drivers": 0,
613 "trainers": 0,
614 "errors": 0,
615 }
617 mapper = HRNZDataMapper()
619 async def _scrape_all():
620 async with HRNZScraper() as scraper:
621 target_urls: list[str] = urls or []
622 if club_codes:
623 from apps.backend.api.main import _generate_hrnz_urls
625 generated = _generate_hrnz_urls(start_date, end_date, club_codes)
626 target_urls.extend(generated)
628 if not target_urls:
629 logger.warning("No URLs provided for scrape job")
630 return stats
632 target_urls = list(dict.fromkeys(target_urls))
634 for url in target_urls:
635 try:
636 scraped = await scraper.get_meeting_results(url)
637 if not scraped.get("races"):
638 continue
640 scraped_date = scraped.get("date")
641 if scraped_date:
642 try:
643 meeting_date = date.fromisoformat(scraped_date)
644 if not (start_date <= meeting_date <= end_date):
645 continue
646 except ValueError:
647 pass
649 meeting = mapper.map_meeting(scraped)
650 entities = mapper.map_entities(scraped)
652 with get_session() as session:
653 MeetingRepository.upsert(session, meeting)
654 stats["meetings"] += 1
656 for horse in entities.get("horses", []):
657 HorseRepository.upsert(
658 session,
659 horse["id"],
660 horse["name"],
661 horse.get("raw_json"),
662 )
663 stats["horses"] += 1
665 for driver in entities.get("drivers", []):
666 DriverRepository.upsert(
667 session, driver["name"], driver_id=driver.get("id")
668 )
669 stats["drivers"] += 1
671 for trainer in entities.get("trainers", []):
672 TrainerRepository.upsert(
673 session,
674 trainer["name"],
675 trainer_id=trainer.get("id"),
676 )
677 stats["trainers"] += 1
679 races = mapper.map_races(scraped, meeting["meeting"])
680 race_id_map = {}
681 for race in races:
682 race_obj = RaceRepository.upsert(
683 session, meeting["meeting"], race
684 )
685 race_id_map[race["race_number"]] = race_obj.id
686 stats["races"] += 1
688 starters = mapper.map_starters(scraped, race_id_map)
689 for starter in starters:
690 StarterRepository.upsert(
691 session,
692 starter["race_id"],
693 starter,
694 starter.get("placing"),
695 )
696 stats["starters"] += 1
698 session.commit()
699 except Exception as exc:
700 stats["errors"] += 1
701 logger.error(
702 "Failed to scrape URL in scheduled job",
703 extra={"url": url, "error": str(exc)},
704 )
706 return stats
708 stats = asyncio.run(_scrape_all())
710 logger.info("Scheduled scrape complete", extra=stats)
711 return stats
712 except Exception as exc:
713 logger.error(
714 "Scheduled scrape failed", extra={"error": str(exc)}, exc_info=True
715 )
716 _notify_failure("scrape_hrnz_weekly", str(exc))
717 return {
718 "meetings": 0,
719 "races": 0,
720 "starters": 0,
721 "horses": 0,
722 "drivers": 0,
723 "trainers": 0,
724 "errors": 1,
725 }
728def _run_eval(
729 date_from: str | None = None,
730 date_to: str | None = None,
731) -> dict[str, Any]:
732 """Execute a weekly evaluation job.
734 Args:
735 date_from: Start date (YYYY-MM-DD). Defaults to 7 days ago.
736 date_to: End date (YYYY-MM-DD). Defaults to today.
738 Returns:
739 Dict with evaluation stats.
740 """
741 if date_from:
742 start_date = date.fromisoformat(date_from)
743 else:
744 start_date = date.today() - timedelta(days=7)
746 if date_to:
747 end_date = date.fromisoformat(date_to)
748 else:
749 end_date = date.today()
751 logger.info(
752 "Scheduled evaluation starting",
753 extra={"date_from": str(start_date), "date_to": str(end_date)},
754 )
756 try:
757 from packages.core.ratings.predictions import PredictionEngine
758 from packages.core.storage.models import Race, Starter
760 with get_session() as session:
761 engine = PredictionEngine(session)
762 races = (
763 session.query(Race)
764 .join(Starter, Race.id == Starter.race_id)
765 .filter(Starter.placing.isnot(None))
766 .filter(Race.race_datetime >= start_date.isoformat())
767 .filter(Race.race_datetime <= end_date.isoformat())
768 .distinct()
769 .all()
770 )
772 total_races = len(races)
773 winner_correct = 0
774 total_brier = 0.0
776 for race in races:
777 try:
778 comparison = engine.compare_prediction_to_actual(race.id)
779 if comparison:
780 if comparison.get("winner_correct"):
781 winner_correct += 1
782 total_brier += comparison.get("brier_score", 0.0)
783 except Exception:
784 continue
786 avg_brier = total_brier / total_races if total_races > 0 else 0.0
787 win_accuracy = winner_correct / total_races if total_races > 0 else 0.0
789 stats = {
790 "races_evaluated": total_races,
791 "winner_accuracy": round(win_accuracy, 4),
792 "avg_brier_score": round(avg_brier, 4),
793 "date_from": str(start_date),
794 "date_to": str(end_date),
795 }
796 logger.info("Scheduled evaluation complete", extra=stats)
797 return stats
798 except Exception as exc:
799 logger.error(
800 "Scheduled evaluation failed", extra={"error": str(exc)}, exc_info=True
801 )
802 _notify_failure("eval_weekly", str(exc))
803 return {"races_evaluated": 0, "winner_accuracy": 0.0, "avg_brier_score": 0.0}
806def _run_full_recompute(
807 date_from: str | None = None,
808 date_to: str | None = None,
809 clear: bool = True,
810) -> dict[str, int]:
811 """Execute a monthly full recompute job.
813 Args:
814 date_from: Start date (YYYY-MM-DD). Defaults to 365 days ago.
815 date_to: End date (YYYY-MM-DD). Defaults to today.
816 clear: If True, clear existing ratings before recompute.
818 Returns:
819 Dict with recompute stats.
820 """
821 if date_from:
822 start_date = date.fromisoformat(date_from)
823 else:
824 start_date = date.today() - timedelta(days=365)
826 if date_to:
827 end_date = date.fromisoformat(date_to)
828 else:
829 end_date = date.today()
831 logger.info(
832 "Scheduled full recompute starting",
833 extra={"date_from": str(start_date), "date_to": str(end_date), "clear": clear},
834 )
836 try:
837 from packages.core.ratings.recompute import recompute_ratings
839 with get_session() as session:
840 snapshots_created = recompute_ratings(
841 session,
842 start_date,
843 end_date,
844 clear_existing=clear,
845 )
847 stats = {"snapshots_created": snapshots_created}
848 logger.info("Scheduled full recompute complete", extra=stats)
849 return stats
850 except Exception as exc:
851 logger.error(
852 "Scheduled full recompute failed", extra={"error": str(exc)}, exc_info=True
853 )
854 _notify_failure("recompute_monthly_full", str(exc))
855 return {"snapshots_created": 0}