Coverage for packages / core / storage / ingestion.py: 12%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
1"""Ingestion service for TAB racing data.
3Supports two data sources:
4 - "tab": Directly from the TAB Affiliates API (default, existing behaviour)
5 - "ingest": From the tab-api-ingest TypeScript service REST API
6"""
8import asyncio
9from datetime import date
11from sqlalchemy.orm import Session
13from packages.core.common.logging import get_logger
14from packages.core.storage.repositories import (
15 MeetingRepository,
16 RaceRepository,
17 StarterRepository,
18)
19from packages.ingest_client import IngestServiceClient
20from packages.tab_client.client import get_client
22logger = get_logger(__name__)
25class IngestionService:
26 """Service for ingesting race data into database.
28 Supports two sources via the *source* parameter:
29 - "tab" (default): fetches data from the TAB Affiliates API directly
30 - "ingest": fetches data from the tab-api-ingest REST service
31 """
33 def __init__(self, session: Session, source: str = "tab"):
34 """Initialize ingestion service.
36 Args:
37 session: Database session
38 source: Data source — "tab" (TAB API direct) or "ingest" (tab-api-ingest service)
40 Raises:
41 ValueError: If source is not a recognised value
42 """
43 if source not in ("tab", "ingest"):
44 raise ValueError(f"Unknown source '{source}'. Use 'tab' or 'ingest'.")
46 self.session = session
47 self.source = source
48 self.stats = {
49 "meetings": 0,
50 "races": 0,
51 "starters": 0,
52 "errors": 0,
53 }
55 async def ingest_date_range(
56 self,
57 date_from: date,
58 date_to: date,
59 category: str | None = None,
60 country: str | None = None,
61 ) -> tuple[int, int, int]:
62 """Ingest all meetings and races in date range.
64 Args:
65 date_from: Start date (inclusive)
66 date_to: End date (inclusive)
67 category: Racing category (T, H, G) — uses default from settings if None
68 country: Country filter — uses default from settings if None
70 Returns:
71 Tuple of (meetings_count, races_count, starters_count)
72 """
73 if self.source == "ingest":
74 return await self._ingest_from_service(
75 date_from, date_to, category, country
76 )
78 logger.info(f"Starting TAB ingestion from {date_from} to {date_to}")
80 async with get_client() as client:
81 # Fetch meetings in range
82 meetings_data = await client.get_meetings(
83 date_from.strftime("%Y-%m-%d"),
84 date_to.strftime("%Y-%m-%d"),
85 category=category,
86 country=country,
87 )
89 logger.info(f"Found {len(meetings_data)} meetings to process")
91 for meeting_data in meetings_data:
92 try:
93 await self._ingest_meeting(client, meeting_data)
94 except Exception as e:
95 logger.error(
96 f"Error ingesting meeting {meeting_data.get('meeting')}: {e}",
97 exc_info=True,
98 )
99 self.stats["errors"] += 1
101 logger.info(
102 f"Ingestion complete: {self.stats['meetings']} meetings, "
103 f"{self.stats['races']} races, {self.stats['starters']} starters, "
104 f"{self.stats['errors']} errors"
105 )
107 return self.stats["meetings"], self.stats["races"], self.stats["starters"]
109 async def ingest_single_date(
110 self,
111 target_date: date,
112 category: str | None = None,
113 country: str | None = None,
114 ) -> tuple[int, int, int]:
115 """Ingest meetings for a single date.
117 Args:
118 target_date: Date to ingest
119 category: Racing category (T, H, G) — uses default from settings if None
120 country: Country filter — uses default from settings if None
122 Returns:
123 Tuple of (meetings_count, races_count, starters_count)
124 """
125 return await self.ingest_date_range(
126 target_date, target_date, category=category, country=country
127 )
129 async def _ingest_from_service(
130 self,
131 date_from: date,
132 date_to: date,
133 category: str | None = None,
134 country: str | None = None,
135 ) -> tuple[int, int, int]:
136 """Ingest from the tab-api-ingest service.
138 Uses IngestServiceClient instead of the TAB API directly.
139 Iterates through meetings returned by the ingest service and
140 processes each with its races and runners.
142 Args:
143 date_from: Start date (inclusive)
144 date_to: End date (inclusive)
145 category: Racing category (T, H, G)
146 country: Country filter
148 Returns:
149 Tuple of (meetings_count, races_count, starters_count)
150 """
151 logger.info(f"Starting ingest-service ingestion from {date_from} to {date_to}")
153 async with IngestServiceClient() as client:
154 # Fetch meetings in range
155 meetings_data = await client.get_meetings(
156 date=str(date_from),
157 country=country,
158 category=category,
159 )
161 logger.info(f"Found {len(meetings_data)} meetings from ingest service")
163 for meeting_data in meetings_data:
164 try:
165 await self._ingest_meeting_from_service(client, meeting_data)
166 except Exception as e:
167 logger.error(
168 f"Error ingesting meeting {meeting_data.get('meeting')}: {e}",
169 exc_info=True,
170 )
171 self.stats["errors"] += 1
173 logger.info(
174 f"Ingest-service ingestion complete: {self.stats['meetings']} meetings, "
175 f"{self.stats['races']} races, {self.stats['starters']} starters, "
176 f"{self.stats['errors']} errors"
177 )
179 return self.stats["meetings"], self.stats["races"], self.stats["starters"]
181 async def _ingest_meeting(self, client, meeting_data: dict) -> None:
182 """Ingest single meeting using a TAB-style client.
184 Args:
185 client: TAB API client (real or mock)
186 meeting_data: Meeting data from API (includes races array with summaries)
187 """
188 # TAB API uses "meeting" field for meeting ID (string)
189 meeting_id = meeting_data.get("meeting")
190 if not meeting_id:
191 logger.warning("Skipping meeting with no ID")
192 return
194 # Upsert meeting
195 meeting = MeetingRepository.upsert(self.session, meeting_data)
196 self.stats["meetings"] += 1
197 self.session.commit()
199 logger.info(
200 f"Processing meeting {meeting_id} ({meeting.venue}, {meeting.meeting_date}, {meeting.category})"
201 )
203 # TAB API includes races array with summaries in the meeting data
204 # Each race has "id" (event_id) and "race_number"
205 races_list = meeting_data.get("races", [])
207 if not races_list:
208 logger.warning(f"No races found in meeting {meeting_id}")
209 return
211 logger.info(f"Found {len(races_list)} races in meeting {meeting_id}")
213 # Process each race
214 for race_summary in races_list:
215 try:
216 # TAB race summary has "id" which is the event_id
217 event_id = race_summary.get("id")
218 race_number = race_summary.get("race_number")
220 if not event_id:
221 logger.warning(
222 f"Skipping race {race_number} with no event_id in meeting {meeting_id}"
223 )
224 continue
226 # Fetch full event/race data with runners and results
227 event_data = await client.get_event(event_id)
228 await self._ingest_race(meeting_id, event_data)
230 # Small delay to be polite to API
231 await asyncio.sleep(0.1)
233 except Exception as e:
234 race_num = race_summary.get("race_number", "?")
235 logger.error(
236 f"Error ingesting race {meeting_id}/{race_num}: {e}",
237 exc_info=True,
238 )
239 self.stats["errors"] += 1
241 async def _ingest_meeting_from_service(
242 self,
243 client: IngestServiceClient,
244 meeting_data: dict,
245 ) -> None:
246 """Ingest single meeting using the ingest service client.
248 Differs from _ingest_meeting in that race data is fetched via
249 the ingest service's own REST endpoints rather than the TAB API.
251 Args:
252 client: IngestServiceClient instance
253 meeting_data: Meeting data from ingest service
254 """
255 # Ingest service uses "meeting" field for meeting ID (string)
256 meeting_id = meeting_data.get("meeting")
257 if not meeting_id:
258 logger.warning("Skipping meeting with no ID")
259 return
261 # Upsert meeting
262 meeting = MeetingRepository.upsert(self.session, meeting_data)
263 self.stats["meetings"] += 1
264 self.session.commit()
266 logger.info(
267 f"Processing meeting {meeting_id} ({meeting.venue}, {meeting.meeting_date}, {meeting.category})"
268 )
270 # Get races for this meeting from the ingest service
271 races_data = await client.get_races(meeting_id=meeting_id)
273 if not races_data:
274 logger.warning(f"No races found for meeting {meeting_id}")
275 return
277 logger.info(f"Found {len(races_data)} races for meeting {meeting_id}")
279 # Process each race
280 for race_summary in races_data:
281 try:
282 race_id = race_summary.get("id") or str(race_summary.get("race_id", ""))
283 race_number = race_summary.get("race_number")
285 if not race_id:
286 logger.warning(
287 f"Skipping race {race_number} with no ID in meeting {meeting_id}"
288 )
289 continue
291 # Fetch full race data and runners from the ingest service
292 full_race_data = await client.get_race(race_id)
293 runners = await client.get_runners(race_id)
295 # Construct event_data compatible with _ingest_race
296 event_data = {
297 "race": full_race_data,
298 "runners": runners,
299 "results": full_race_data.get("results", []),
300 }
302 # If the race data already contains runners/results at top level, use those
303 if isinstance(full_race_data, dict):
304 if "runners" in full_race_data and not event_data["runners"]:
305 event_data["runners"] = full_race_data["runners"]
306 if "results" in full_race_data and not event_data["results"]:
307 event_data["results"] = full_race_data["results"]
309 await self._ingest_race(meeting_id, event_data)
311 # Small delay to be polite
312 await asyncio.sleep(0.1)
314 except Exception as e:
315 race_num = race_summary.get("race_number", "?")
316 logger.error(
317 f"Error ingesting race {meeting_id}/{race_num}: {e}",
318 exc_info=True,
319 )
320 self.stats["errors"] += 1
322 async def _ingest_race(self, meeting_id: str, event_data: dict) -> None:
323 """Ingest single race with all starters.
325 Works with event data from either the TAB API or the ingest service.
326 Expects event_data to have keys: "race", "runners", "results".
328 Args:
329 meeting_id: Parent meeting ID (string)
330 event_data: Full event data (includes race, runners, results)
331 """
332 # TAB API returns: {race: {...}, runners: [...], results: [...], ...}
333 # Ingest service: same shape constructed by _ingest_meeting_from_service
334 race_data = event_data.get("race", {})
336 race_number = race_data.get("race_number")
337 if not race_number:
338 logger.warning(f"Skipping race with no number in meeting {meeting_id}")
339 return
341 # Upsert race
342 race = RaceRepository.upsert(self.session, meeting_id, race_data)
343 self.stats["races"] += 1
345 # Get runners/starters
346 runners = event_data.get("runners", [])
348 # Get results and build lookup by entrant_id
349 results = event_data.get("results", [])
350 result_map: dict[str, int] = {}
351 for result in results:
352 entrant_id = result.get("entrant_id")
353 position = result.get("position")
354 if entrant_id and position:
355 result_map[entrant_id] = int(position)
357 logger.debug(
358 f"Processing {len(runners)} starters for race {meeting_id}/{race_number} "
359 f"({len(result_map)} results)"
360 )
362 for runner_data in runners:
363 try:
364 # Match result to runner by entrant_id
365 entrant_id = runner_data.get("entrant_id")
366 placing = result_map.get(entrant_id) if entrant_id else None
368 starter = StarterRepository.upsert(
369 self.session, race.id, runner_data, placing=placing
370 )
371 if starter: # None if scratched
372 self.stats["starters"] += 1
373 except Exception as e:
374 logger.error(
375 f"Error ingesting starter in race {race.id}: {e}",
376 exc_info=True,
377 )
378 self.stats["errors"] += 1
380 # Commit after each race
381 self.session.commit()
384async def ingest_meetings(
385 session: Session,
386 date_from: date,
387 date_to: date,
388 category: str | None = None,
389 country: str | None = None,
390 source: str = "tab",
391) -> tuple[int, int, int]:
392 """Convenience function to ingest meetings.
394 Args:
395 session: Database session
396 date_from: Start date
397 date_to: End date
398 category: Racing category (T, H, G) — uses default from settings if None
399 country: Country filter — uses default from settings if None
400 source: Data source — "tab" (default) or "ingest"
402 Returns:
403 Tuple of (meetings_count, races_count, starters_count)
404 """
405 service = IngestionService(session, source=source)
406 return await service.ingest_date_range(
407 date_from, date_to, category=category, country=country
408 )