Coverage for packages / core / common / data_quality.py: 66%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 08:37 +1200

1"""Data quality validation and monitoring for harness racing data.""" 

2 

3from dataclasses import dataclass, field 

4from datetime import date, datetime 

5 

6from sqlalchemy import func 

7from sqlalchemy.orm import Session 

8 

9from packages.core.common.logging import get_logger 

10from packages.core.storage.models import Meeting, Race, Starter 

11 

12logger = get_logger(__name__) 

13 

14 

15@dataclass 

16class ValidationIssue: 

17 """Represents a data quality issue.""" 

18 

19 severity: str # "error", "warning", "info" 

20 category: str # "placing", "missing_data", "suspicious", "completeness" 

21 message: str 

22 race_id: int | None = None 

23 meeting_id: int | None = None 

24 starter_id: int | None = None 

25 details: dict = field(default_factory=dict) 

26 

27 

28@dataclass 

29class DataQualityReport: 

30 """Data quality assessment report.""" 

31 

32 start_date: date 

33 end_date: date 

34 total_meetings: int 

35 total_races: int 

36 total_starters: int 

37 issues: list[ValidationIssue] = field(default_factory=list) 

38 metrics: dict = field(default_factory=dict) 

39 generated_at: datetime = field(default_factory=datetime.now) 

40 

41 @property 

42 def error_count(self) -> int: 

43 """Count of error-level issues.""" 

44 return sum(1 for issue in self.issues if issue.severity == "error") 

45 

46 @property 

47 def warning_count(self) -> int: 

48 """Count of warning-level issues.""" 

49 return sum(1 for issue in self.issues if issue.severity == "warning") 

50 

51 @property 

52 def has_errors(self) -> bool: 

53 """Whether report contains any errors.""" 

54 return self.error_count > 0 

55 

56 

57class DataQualityValidator: 

58 """Validates data quality for harness racing data.""" 

59 

60 def __init__(self, session: Session): 

61 """Initialize validator. 

62 

63 Args: 

64 session: Database session 

65 """ 

66 self.session = session 

67 

68 def validate_race( 

69 self, race: Race, starters: list[Starter] 

70 ) -> list[ValidationIssue]: 

71 """Validate a single race and its starters. 

72 

73 Args: 

74 race: Race to validate 

75 starters: List of starters in the race 

76 

77 Returns: 

78 List of validation issues found 

79 """ 

80 issues = [] 

81 

82 # Validate placing sequence 

83 issues.extend(self._validate_placings(race, starters)) 

84 

85 # Validate data completeness 

86 issues.extend(self._validate_completeness(race, starters)) 

87 

88 # Detect suspicious results 

89 issues.extend(self._detect_suspicious_results(race, starters)) 

90 

91 return issues 

92 

93 def _validate_placings( 

94 self, race: Race, starters: list[Starter] 

95 ) -> list[ValidationIssue]: 

96 """Validate placing sequence is valid. 

97 

98 Checks: 

99 - No gaps in placing sequence (1, 2, 3, ... or 1, 2, 4 if 3 DNF) 

100 - No duplicate placings 

101 - First place exists 

102 - Placings are positive integers 

103 

104 Args: 

105 race: Race instance 

106 starters: List of starters 

107 

108 Returns: 

109 List of validation issues 

110 """ 

111 issues = [] 

112 

113 # Get finished starters only 

114 finished = [ 

115 s for s in starters if s.placing is not None and not s.did_not_finish 

116 ] 

117 

118 if not finished: 

119 issues.append( 

120 ValidationIssue( 

121 severity="warning", 

122 category="placing", 

123 message="No finishers in race", 

124 race_id=race.id, 

125 details={"starter_count": len(starters)}, 

126 ) 

127 ) 

128 return issues 

129 

130 placings = [s.placing for s in finished] 

131 

132 # Check for duplicates 

133 if len(placings) != len(set(placings)): 

134 duplicates = [p for p in set(placings) if placings.count(p) > 1] 

135 issues.append( 

136 ValidationIssue( 

137 severity="error", 

138 category="placing", 

139 message=f"Duplicate placings found: {duplicates}", 

140 race_id=race.id, 

141 details={"duplicates": duplicates}, 

142 ) 

143 ) 

144 

145 # Check first place exists 

146 if 1 not in placings: 

147 issues.append( 

148 ValidationIssue( 

149 severity="error", 

150 category="placing", 

151 message="No first place finisher", 

152 race_id=race.id, 

153 details={"placings": sorted(placings)}, 

154 ) 

155 ) 

156 

157 # Check for gaps (allowing for DNF) 

158 sorted_placings = sorted(placings) 

159 expected = list(range(1, len(placings) + 1)) 

160 

161 # Allow small gaps (up to 2) for DNF cases 

162 max_gap = max(sorted_placings) - len(sorted_placings) 

163 if max_gap > 2: 

164 issues.append( 

165 ValidationIssue( 

166 severity="warning", 

167 category="placing", 

168 message=f"Large gap in placing sequence (gap={max_gap})", 

169 race_id=race.id, 

170 details={"placings": sorted_placings, "expected": expected}, 

171 ) 

172 ) 

173 

174 # Check for invalid placing values 

175 for starter in finished: 

176 if starter.placing <= 0: 

177 issues.append( 

178 ValidationIssue( 

179 severity="error", 

180 category="placing", 

181 message=f"Invalid placing value: {starter.placing}", 

182 race_id=race.id, 

183 starter_id=starter.id, 

184 ) 

185 ) 

186 

187 return issues 

188 

189 def _validate_completeness( 

190 self, race: Race, starters: list[Starter] 

191 ) -> list[ValidationIssue]: 

192 """Validate data completeness. 

193 

194 Checks: 

195 - All starters have horse_id 

196 - Missing driver assignments 

197 - Missing trainer assignments 

198 - Missing barrier positions 

199 

200 Args: 

201 race: Race instance 

202 starters: List of starters 

203 

204 Returns: 

205 List of validation issues 

206 """ 

207 issues = [] 

208 

209 missing_horses = sum(1 for s in starters if not s.horse_id) 

210 missing_drivers = sum(1 for s in starters if not s.driver_id) 

211 missing_trainers = sum(1 for s in starters if not s.trainer_id) 

212 missing_barriers = sum(1 for s in starters if s.barrier is None) 

213 

214 if missing_horses > 0: 

215 issues.append( 

216 ValidationIssue( 

217 severity="error", 

218 category="missing_data", 

219 message=f"{missing_horses} starters missing horse_id", 

220 race_id=race.id, 

221 details={"count": missing_horses}, 

222 ) 

223 ) 

224 

225 if missing_drivers > 0: 

226 issues.append( 

227 ValidationIssue( 

228 severity="warning", 

229 category="missing_data", 

230 message=f"{missing_drivers} starters missing driver assignment", 

231 race_id=race.id, 

232 details={"count": missing_drivers}, 

233 ) 

234 ) 

235 

236 if missing_trainers > 0: 

237 issues.append( 

238 ValidationIssue( 

239 severity="warning", 

240 category="missing_data", 

241 message=f"{missing_trainers} starters missing trainer assignment", 

242 race_id=race.id, 

243 details={"count": missing_trainers}, 

244 ) 

245 ) 

246 

247 if missing_barriers > 0: 

248 issues.append( 

249 ValidationIssue( 

250 severity="warning", 

251 category="missing_data", 

252 message=f"{missing_barriers} starters missing barrier position", 

253 race_id=race.id, 

254 details={"count": missing_barriers}, 

255 ) 

256 ) 

257 

258 return issues 

259 

260 def _detect_suspicious_results( 

261 self, race: Race, starters: list[Starter] 

262 ) -> list[ValidationIssue]: 

263 """Detect suspicious or anomalous results. 

264 

265 Checks: 

266 - Very few starters (< 3) 

267 - Too many DNFs (> 50% of field) 

268 - Unusual handicap values 

269 

270 Args: 

271 race: Race instance 

272 starters: List of starters 

273 

274 Returns: 

275 List of validation issues 

276 """ 

277 issues = [] 

278 

279 # Check field size 

280 if len(starters) < 3: 

281 issues.append( 

282 ValidationIssue( 

283 severity="warning", 

284 category="suspicious", 

285 message=f"Very small field size: {len(starters)} starters", 

286 race_id=race.id, 

287 details={"starter_count": len(starters)}, 

288 ) 

289 ) 

290 

291 # Check DNF rate 

292 dnf_count = sum(1 for s in starters if s.did_not_finish) 

293 if len(starters) > 0: 

294 dnf_rate = dnf_count / len(starters) 

295 if dnf_rate > 0.5: 

296 issues.append( 

297 ValidationIssue( 

298 severity="warning", 

299 category="suspicious", 

300 message=f"High DNF rate: {dnf_rate:.1%} ({dnf_count}/{len(starters)})", 

301 race_id=race.id, 

302 details={"dnf_count": dnf_count, "dnf_rate": dnf_rate}, 

303 ) 

304 ) 

305 

306 # Check for unusual handicaps 

307 handicaps = [s.handicap_m for s in starters if s.handicap_m is not None] 

308 if handicaps: 

309 max_handicap = max(handicaps) 

310 if max_handicap > 100: # > 100m back is very unusual 

311 issues.append( 

312 ValidationIssue( 

313 severity="info", 

314 category="suspicious", 

315 message=f"Unusually large handicap: {max_handicap}m", 

316 race_id=race.id, 

317 details={"max_handicap": max_handicap}, 

318 ) 

319 ) 

320 

321 return issues 

322 

323 def generate_report(self, from_date: date, to_date: date) -> DataQualityReport: 

324 """Generate comprehensive data quality report for date range. 

325 

326 Args: 

327 from_date: Start date (inclusive) 

328 to_date: End date (inclusive) 

329 

330 Returns: 

331 DataQualityReport with all issues and metrics 

332 """ 

333 logger.info(f"Generating data quality report from {from_date} to {to_date}") 

334 

335 # Get all meetings in date range 

336 meetings = ( 

337 self.session.query(Meeting) 

338 .filter( 

339 Meeting.meeting_date >= from_date, 

340 Meeting.meeting_date <= to_date, 

341 ) 

342 .all() 

343 ) 

344 

345 all_issues = [] 

346 total_races = 0 

347 total_starters = 0 

348 

349 # Validate each race 

350 for meeting in meetings: 

351 for race in meeting.races: 

352 total_races += 1 

353 starters = race.starters 

354 total_starters += len(starters) 

355 

356 race_issues = self.validate_race(race, starters) 

357 all_issues.extend(race_issues) 

358 

359 # Compute additional metrics 

360 metrics = self._compute_metrics(meetings, total_races, total_starters) 

361 

362 report = DataQualityReport( 

363 start_date=from_date, 

364 end_date=to_date, 

365 total_meetings=len(meetings), 

366 total_races=total_races, 

367 total_starters=total_starters, 

368 issues=all_issues, 

369 metrics=metrics, 

370 ) 

371 

372 logger.info( 

373 f"Data quality report complete: {report.error_count} errors, " 

374 f"{report.warning_count} warnings across {total_races} races" 

375 ) 

376 

377 return report 

378 

379 def _compute_metrics( 

380 self, meetings: list[Meeting], total_races: int, total_starters: int 

381 ) -> dict: 

382 """Compute data quality metrics. 

383 

384 Args: 

385 meetings: List of meetings 

386 total_races: Total race count 

387 total_starters: Total starter count 

388 

389 Returns: 

390 Dictionary of metrics 

391 """ 

392 if not meetings or total_races == 0: 

393 return {} 

394 

395 # Compute averages 

396 avg_races_per_meeting = total_races / len(meetings) if meetings else 0 

397 avg_starters_per_race = total_starters / total_races if total_races else 0 

398 

399 # Compute completeness metrics 

400 total_with_driver = ( 

401 self.session.query(func.count(Starter.id)) 

402 .join(Starter.race) 

403 .join(Race.meeting) 

404 .filter( 

405 Meeting.meeting_date >= meetings[0].meeting_date, 

406 Meeting.meeting_date <= meetings[-1].meeting_date, 

407 Starter.driver_id.isnot(None), 

408 ) 

409 .scalar() 

410 ) 

411 

412 total_with_trainer = ( 

413 self.session.query(func.count(Starter.id)) 

414 .join(Starter.race) 

415 .join(Race.meeting) 

416 .filter( 

417 Meeting.meeting_date >= meetings[0].meeting_date, 

418 Meeting.meeting_date <= meetings[-1].meeting_date, 

419 Starter.trainer_id.isnot(None), 

420 ) 

421 .scalar() 

422 ) 

423 

424 driver_completeness = ( 

425 total_with_driver / total_starters if total_starters > 0 else 0 

426 ) 

427 trainer_completeness = ( 

428 total_with_trainer / total_starters if total_starters > 0 else 0 

429 ) 

430 

431 return { 

432 "avg_races_per_meeting": round(avg_races_per_meeting, 2), 

433 "avg_starters_per_race": round(avg_starters_per_race, 2), 

434 "driver_assignment_rate": round(driver_completeness, 3), 

435 "trainer_assignment_rate": round(trainer_completeness, 3), 

436 } 

437 

438 

439def check_data_freshness( 

440 session: Session, max_age_days: int = 7 

441) -> ValidationIssue | None: 

442 """Check if data is fresh (recent meetings exist). 

443 

444 Args: 

445 session: Database session 

446 max_age_days: Maximum acceptable age of most recent meeting 

447 

448 Returns: 

449 ValidationIssue if data is stale, None otherwise 

450 """ 

451 

452 latest_meeting = ( 

453 session.query(Meeting).order_by(Meeting.meeting_date.desc()).first() 

454 ) 

455 

456 if not latest_meeting: 

457 return ValidationIssue( 

458 severity="error", 

459 category="completeness", 

460 message="No meetings found in database", 

461 ) 

462 

463 days_old = (date.today() - latest_meeting.meeting_date).days 

464 

465 if days_old > max_age_days: 

466 return ValidationIssue( 

467 severity="warning", 

468 category="completeness", 

469 message=f"Data may be stale: most recent meeting is {days_old} days old", 

470 meeting_id=latest_meeting.id, 

471 details={ 

472 "latest_meeting_date": latest_meeting.meeting_date.isoformat(), 

473 "days_old": days_old, 

474 }, 

475 ) 

476 

477 return None