Coverage for packages / core / storage / ingestion.py: 12%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 08:37 +1200

1"""Ingestion service for TAB racing data. 

2 

3Supports two data sources: 

4 - "tab": Directly from the TAB Affiliates API (default, existing behaviour) 

5 - "ingest": From the tab-api-ingest TypeScript service REST API 

6""" 

7 

8import asyncio 

9from datetime import date 

10 

11from sqlalchemy.orm import Session 

12 

13from packages.core.common.logging import get_logger 

14from packages.core.storage.repositories import ( 

15 MeetingRepository, 

16 RaceRepository, 

17 StarterRepository, 

18) 

19from packages.ingest_client import IngestServiceClient 

20from packages.tab_client.client import get_client 

21 

22logger = get_logger(__name__) 

23 

24 

25class IngestionService: 

26 """Service for ingesting race data into database. 

27 

28 Supports two sources via the *source* parameter: 

29 - "tab" (default): fetches data from the TAB Affiliates API directly 

30 - "ingest": fetches data from the tab-api-ingest REST service 

31 """ 

32 

33 def __init__(self, session: Session, source: str = "tab"): 

34 """Initialize ingestion service. 

35 

36 Args: 

37 session: Database session 

38 source: Data source — "tab" (TAB API direct) or "ingest" (tab-api-ingest service) 

39 

40 Raises: 

41 ValueError: If source is not a recognised value 

42 """ 

43 if source not in ("tab", "ingest"): 

44 raise ValueError(f"Unknown source '{source}'. Use 'tab' or 'ingest'.") 

45 

46 self.session = session 

47 self.source = source 

48 self.stats = { 

49 "meetings": 0, 

50 "races": 0, 

51 "starters": 0, 

52 "errors": 0, 

53 } 

54 

55 async def ingest_date_range( 

56 self, 

57 date_from: date, 

58 date_to: date, 

59 category: str | None = None, 

60 country: str | None = None, 

61 ) -> tuple[int, int, int]: 

62 """Ingest all meetings and races in date range. 

63 

64 Args: 

65 date_from: Start date (inclusive) 

66 date_to: End date (inclusive) 

67 category: Racing category (T, H, G) — uses default from settings if None 

68 country: Country filter — uses default from settings if None 

69 

70 Returns: 

71 Tuple of (meetings_count, races_count, starters_count) 

72 """ 

73 if self.source == "ingest": 

74 return await self._ingest_from_service( 

75 date_from, date_to, category, country 

76 ) 

77 

78 logger.info(f"Starting TAB ingestion from {date_from} to {date_to}") 

79 

80 async with get_client() as client: 

81 # Fetch meetings in range 

82 meetings_data = await client.get_meetings( 

83 date_from.strftime("%Y-%m-%d"), 

84 date_to.strftime("%Y-%m-%d"), 

85 category=category, 

86 country=country, 

87 ) 

88 

89 logger.info(f"Found {len(meetings_data)} meetings to process") 

90 

91 for meeting_data in meetings_data: 

92 try: 

93 await self._ingest_meeting(client, meeting_data) 

94 except Exception as e: 

95 logger.error( 

96 f"Error ingesting meeting {meeting_data.get('meeting')}: {e}", 

97 exc_info=True, 

98 ) 

99 self.stats["errors"] += 1 

100 

101 logger.info( 

102 f"Ingestion complete: {self.stats['meetings']} meetings, " 

103 f"{self.stats['races']} races, {self.stats['starters']} starters, " 

104 f"{self.stats['errors']} errors" 

105 ) 

106 

107 return self.stats["meetings"], self.stats["races"], self.stats["starters"] 

108 

109 async def ingest_single_date( 

110 self, 

111 target_date: date, 

112 category: str | None = None, 

113 country: str | None = None, 

114 ) -> tuple[int, int, int]: 

115 """Ingest meetings for a single date. 

116 

117 Args: 

118 target_date: Date to ingest 

119 category: Racing category (T, H, G) — uses default from settings if None 

120 country: Country filter — uses default from settings if None 

121 

122 Returns: 

123 Tuple of (meetings_count, races_count, starters_count) 

124 """ 

125 return await self.ingest_date_range( 

126 target_date, target_date, category=category, country=country 

127 ) 

128 

129 async def _ingest_from_service( 

130 self, 

131 date_from: date, 

132 date_to: date, 

133 category: str | None = None, 

134 country: str | None = None, 

135 ) -> tuple[int, int, int]: 

136 """Ingest from the tab-api-ingest service. 

137 

138 Uses IngestServiceClient instead of the TAB API directly. 

139 Iterates through meetings returned by the ingest service and 

140 processes each with its races and runners. 

141 

142 Args: 

143 date_from: Start date (inclusive) 

144 date_to: End date (inclusive) 

145 category: Racing category (T, H, G) 

146 country: Country filter 

147 

148 Returns: 

149 Tuple of (meetings_count, races_count, starters_count) 

150 """ 

151 logger.info(f"Starting ingest-service ingestion from {date_from} to {date_to}") 

152 

153 async with IngestServiceClient() as client: 

154 # Fetch meetings in range 

155 meetings_data = await client.get_meetings( 

156 date=str(date_from), 

157 country=country, 

158 category=category, 

159 ) 

160 

161 logger.info(f"Found {len(meetings_data)} meetings from ingest service") 

162 

163 for meeting_data in meetings_data: 

164 try: 

165 await self._ingest_meeting_from_service(client, meeting_data) 

166 except Exception as e: 

167 logger.error( 

168 f"Error ingesting meeting {meeting_data.get('meeting')}: {e}", 

169 exc_info=True, 

170 ) 

171 self.stats["errors"] += 1 

172 

173 logger.info( 

174 f"Ingest-service ingestion complete: {self.stats['meetings']} meetings, " 

175 f"{self.stats['races']} races, {self.stats['starters']} starters, " 

176 f"{self.stats['errors']} errors" 

177 ) 

178 

179 return self.stats["meetings"], self.stats["races"], self.stats["starters"] 

180 

181 async def _ingest_meeting(self, client, meeting_data: dict) -> None: 

182 """Ingest single meeting using a TAB-style client. 

183 

184 Args: 

185 client: TAB API client (real or mock) 

186 meeting_data: Meeting data from API (includes races array with summaries) 

187 """ 

188 # TAB API uses "meeting" field for meeting ID (string) 

189 meeting_id = meeting_data.get("meeting") 

190 if not meeting_id: 

191 logger.warning("Skipping meeting with no ID") 

192 return 

193 

194 # Upsert meeting 

195 meeting = MeetingRepository.upsert(self.session, meeting_data) 

196 self.stats["meetings"] += 1 

197 self.session.commit() 

198 

199 logger.info( 

200 f"Processing meeting {meeting_id} ({meeting.venue}, {meeting.meeting_date}, {meeting.category})" 

201 ) 

202 

203 # TAB API includes races array with summaries in the meeting data 

204 # Each race has "id" (event_id) and "race_number" 

205 races_list = meeting_data.get("races", []) 

206 

207 if not races_list: 

208 logger.warning(f"No races found in meeting {meeting_id}") 

209 return 

210 

211 logger.info(f"Found {len(races_list)} races in meeting {meeting_id}") 

212 

213 # Process each race 

214 for race_summary in races_list: 

215 try: 

216 # TAB race summary has "id" which is the event_id 

217 event_id = race_summary.get("id") 

218 race_number = race_summary.get("race_number") 

219 

220 if not event_id: 

221 logger.warning( 

222 f"Skipping race {race_number} with no event_id in meeting {meeting_id}" 

223 ) 

224 continue 

225 

226 # Fetch full event/race data with runners and results 

227 event_data = await client.get_event(event_id) 

228 await self._ingest_race(meeting_id, event_data) 

229 

230 # Small delay to be polite to API 

231 await asyncio.sleep(0.1) 

232 

233 except Exception as e: 

234 race_num = race_summary.get("race_number", "?") 

235 logger.error( 

236 f"Error ingesting race {meeting_id}/{race_num}: {e}", 

237 exc_info=True, 

238 ) 

239 self.stats["errors"] += 1 

240 

241 async def _ingest_meeting_from_service( 

242 self, 

243 client: IngestServiceClient, 

244 meeting_data: dict, 

245 ) -> None: 

246 """Ingest single meeting using the ingest service client. 

247 

248 Differs from _ingest_meeting in that race data is fetched via 

249 the ingest service's own REST endpoints rather than the TAB API. 

250 

251 Args: 

252 client: IngestServiceClient instance 

253 meeting_data: Meeting data from ingest service 

254 """ 

255 # Ingest service uses "meeting" field for meeting ID (string) 

256 meeting_id = meeting_data.get("meeting") 

257 if not meeting_id: 

258 logger.warning("Skipping meeting with no ID") 

259 return 

260 

261 # Upsert meeting 

262 meeting = MeetingRepository.upsert(self.session, meeting_data) 

263 self.stats["meetings"] += 1 

264 self.session.commit() 

265 

266 logger.info( 

267 f"Processing meeting {meeting_id} ({meeting.venue}, {meeting.meeting_date}, {meeting.category})" 

268 ) 

269 

270 # Get races for this meeting from the ingest service 

271 races_data = await client.get_races(meeting_id=meeting_id) 

272 

273 if not races_data: 

274 logger.warning(f"No races found for meeting {meeting_id}") 

275 return 

276 

277 logger.info(f"Found {len(races_data)} races for meeting {meeting_id}") 

278 

279 # Process each race 

280 for race_summary in races_data: 

281 try: 

282 race_id = race_summary.get("id") or str(race_summary.get("race_id", "")) 

283 race_number = race_summary.get("race_number") 

284 

285 if not race_id: 

286 logger.warning( 

287 f"Skipping race {race_number} with no ID in meeting {meeting_id}" 

288 ) 

289 continue 

290 

291 # Fetch full race data and runners from the ingest service 

292 full_race_data = await client.get_race(race_id) 

293 runners = await client.get_runners(race_id) 

294 

295 # Construct event_data compatible with _ingest_race 

296 event_data = { 

297 "race": full_race_data, 

298 "runners": runners, 

299 "results": full_race_data.get("results", []), 

300 } 

301 

302 # If the race data already contains runners/results at top level, use those 

303 if isinstance(full_race_data, dict): 

304 if "runners" in full_race_data and not event_data["runners"]: 

305 event_data["runners"] = full_race_data["runners"] 

306 if "results" in full_race_data and not event_data["results"]: 

307 event_data["results"] = full_race_data["results"] 

308 

309 await self._ingest_race(meeting_id, event_data) 

310 

311 # Small delay to be polite 

312 await asyncio.sleep(0.1) 

313 

314 except Exception as e: 

315 race_num = race_summary.get("race_number", "?") 

316 logger.error( 

317 f"Error ingesting race {meeting_id}/{race_num}: {e}", 

318 exc_info=True, 

319 ) 

320 self.stats["errors"] += 1 

321 

322 async def _ingest_race(self, meeting_id: str, event_data: dict) -> None: 

323 """Ingest single race with all starters. 

324 

325 Works with event data from either the TAB API or the ingest service. 

326 Expects event_data to have keys: "race", "runners", "results". 

327 

328 Args: 

329 meeting_id: Parent meeting ID (string) 

330 event_data: Full event data (includes race, runners, results) 

331 """ 

332 # TAB API returns: {race: {...}, runners: [...], results: [...], ...} 

333 # Ingest service: same shape constructed by _ingest_meeting_from_service 

334 race_data = event_data.get("race", {}) 

335 

336 race_number = race_data.get("race_number") 

337 if not race_number: 

338 logger.warning(f"Skipping race with no number in meeting {meeting_id}") 

339 return 

340 

341 # Upsert race 

342 race = RaceRepository.upsert(self.session, meeting_id, race_data) 

343 self.stats["races"] += 1 

344 

345 # Get runners/starters 

346 runners = event_data.get("runners", []) 

347 

348 # Get results and build lookup by entrant_id 

349 results = event_data.get("results", []) 

350 result_map: dict[str, int] = {} 

351 for result in results: 

352 entrant_id = result.get("entrant_id") 

353 position = result.get("position") 

354 if entrant_id and position: 

355 result_map[entrant_id] = int(position) 

356 

357 logger.debug( 

358 f"Processing {len(runners)} starters for race {meeting_id}/{race_number} " 

359 f"({len(result_map)} results)" 

360 ) 

361 

362 for runner_data in runners: 

363 try: 

364 # Match result to runner by entrant_id 

365 entrant_id = runner_data.get("entrant_id") 

366 placing = result_map.get(entrant_id) if entrant_id else None 

367 

368 starter = StarterRepository.upsert( 

369 self.session, race.id, runner_data, placing=placing 

370 ) 

371 if starter: # None if scratched 

372 self.stats["starters"] += 1 

373 except Exception as e: 

374 logger.error( 

375 f"Error ingesting starter in race {race.id}: {e}", 

376 exc_info=True, 

377 ) 

378 self.stats["errors"] += 1 

379 

380 # Commit after each race 

381 self.session.commit() 

382 

383 

384async def ingest_meetings( 

385 session: Session, 

386 date_from: date, 

387 date_to: date, 

388 category: str | None = None, 

389 country: str | None = None, 

390 source: str = "tab", 

391) -> tuple[int, int, int]: 

392 """Convenience function to ingest meetings. 

393 

394 Args: 

395 session: Database session 

396 date_from: Start date 

397 date_to: End date 

398 category: Racing category (T, H, G) — uses default from settings if None 

399 country: Country filter — uses default from settings if None 

400 source: Data source — "tab" (default) or "ingest" 

401 

402 Returns: 

403 Tuple of (meetings_count, races_count, starters_count) 

404 """ 

405 service = IngestionService(session, source=source) 

406 return await service.ingest_date_range( 

407 date_from, date_to, category=category, country=country 

408 )