Coverage for packages / core / common / data_quality.py: 66%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:37 +1200
1"""Data quality validation and monitoring for harness racing data."""
3from dataclasses import dataclass, field
4from datetime import date, datetime
6from sqlalchemy import func
7from sqlalchemy.orm import Session
9from packages.core.common.logging import get_logger
10from packages.core.storage.models import Meeting, Race, Starter
12logger = get_logger(__name__)
15@dataclass
16class ValidationIssue:
17 """Represents a data quality issue."""
19 severity: str # "error", "warning", "info"
20 category: str # "placing", "missing_data", "suspicious", "completeness"
21 message: str
22 race_id: int | None = None
23 meeting_id: int | None = None
24 starter_id: int | None = None
25 details: dict = field(default_factory=dict)
28@dataclass
29class DataQualityReport:
30 """Data quality assessment report."""
32 start_date: date
33 end_date: date
34 total_meetings: int
35 total_races: int
36 total_starters: int
37 issues: list[ValidationIssue] = field(default_factory=list)
38 metrics: dict = field(default_factory=dict)
39 generated_at: datetime = field(default_factory=datetime.now)
41 @property
42 def error_count(self) -> int:
43 """Count of error-level issues."""
44 return sum(1 for issue in self.issues if issue.severity == "error")
46 @property
47 def warning_count(self) -> int:
48 """Count of warning-level issues."""
49 return sum(1 for issue in self.issues if issue.severity == "warning")
51 @property
52 def has_errors(self) -> bool:
53 """Whether report contains any errors."""
54 return self.error_count > 0
57class DataQualityValidator:
58 """Validates data quality for harness racing data."""
60 def __init__(self, session: Session):
61 """Initialize validator.
63 Args:
64 session: Database session
65 """
66 self.session = session
68 def validate_race(
69 self, race: Race, starters: list[Starter]
70 ) -> list[ValidationIssue]:
71 """Validate a single race and its starters.
73 Args:
74 race: Race to validate
75 starters: List of starters in the race
77 Returns:
78 List of validation issues found
79 """
80 issues = []
82 # Validate placing sequence
83 issues.extend(self._validate_placings(race, starters))
85 # Validate data completeness
86 issues.extend(self._validate_completeness(race, starters))
88 # Detect suspicious results
89 issues.extend(self._detect_suspicious_results(race, starters))
91 return issues
93 def _validate_placings(
94 self, race: Race, starters: list[Starter]
95 ) -> list[ValidationIssue]:
96 """Validate placing sequence is valid.
98 Checks:
99 - No gaps in placing sequence (1, 2, 3, ... or 1, 2, 4 if 3 DNF)
100 - No duplicate placings
101 - First place exists
102 - Placings are positive integers
104 Args:
105 race: Race instance
106 starters: List of starters
108 Returns:
109 List of validation issues
110 """
111 issues = []
113 # Get finished starters only
114 finished = [
115 s for s in starters if s.placing is not None and not s.did_not_finish
116 ]
118 if not finished:
119 issues.append(
120 ValidationIssue(
121 severity="warning",
122 category="placing",
123 message="No finishers in race",
124 race_id=race.id,
125 details={"starter_count": len(starters)},
126 )
127 )
128 return issues
130 placings = [s.placing for s in finished]
132 # Check for duplicates
133 if len(placings) != len(set(placings)):
134 duplicates = [p for p in set(placings) if placings.count(p) > 1]
135 issues.append(
136 ValidationIssue(
137 severity="error",
138 category="placing",
139 message=f"Duplicate placings found: {duplicates}",
140 race_id=race.id,
141 details={"duplicates": duplicates},
142 )
143 )
145 # Check first place exists
146 if 1 not in placings:
147 issues.append(
148 ValidationIssue(
149 severity="error",
150 category="placing",
151 message="No first place finisher",
152 race_id=race.id,
153 details={"placings": sorted(placings)},
154 )
155 )
157 # Check for gaps (allowing for DNF)
158 sorted_placings = sorted(placings)
159 expected = list(range(1, len(placings) + 1))
161 # Allow small gaps (up to 2) for DNF cases
162 max_gap = max(sorted_placings) - len(sorted_placings)
163 if max_gap > 2:
164 issues.append(
165 ValidationIssue(
166 severity="warning",
167 category="placing",
168 message=f"Large gap in placing sequence (gap={max_gap})",
169 race_id=race.id,
170 details={"placings": sorted_placings, "expected": expected},
171 )
172 )
174 # Check for invalid placing values
175 for starter in finished:
176 if starter.placing <= 0:
177 issues.append(
178 ValidationIssue(
179 severity="error",
180 category="placing",
181 message=f"Invalid placing value: {starter.placing}",
182 race_id=race.id,
183 starter_id=starter.id,
184 )
185 )
187 return issues
189 def _validate_completeness(
190 self, race: Race, starters: list[Starter]
191 ) -> list[ValidationIssue]:
192 """Validate data completeness.
194 Checks:
195 - All starters have horse_id
196 - Missing driver assignments
197 - Missing trainer assignments
198 - Missing barrier positions
200 Args:
201 race: Race instance
202 starters: List of starters
204 Returns:
205 List of validation issues
206 """
207 issues = []
209 missing_horses = sum(1 for s in starters if not s.horse_id)
210 missing_drivers = sum(1 for s in starters if not s.driver_id)
211 missing_trainers = sum(1 for s in starters if not s.trainer_id)
212 missing_barriers = sum(1 for s in starters if s.barrier is None)
214 if missing_horses > 0:
215 issues.append(
216 ValidationIssue(
217 severity="error",
218 category="missing_data",
219 message=f"{missing_horses} starters missing horse_id",
220 race_id=race.id,
221 details={"count": missing_horses},
222 )
223 )
225 if missing_drivers > 0:
226 issues.append(
227 ValidationIssue(
228 severity="warning",
229 category="missing_data",
230 message=f"{missing_drivers} starters missing driver assignment",
231 race_id=race.id,
232 details={"count": missing_drivers},
233 )
234 )
236 if missing_trainers > 0:
237 issues.append(
238 ValidationIssue(
239 severity="warning",
240 category="missing_data",
241 message=f"{missing_trainers} starters missing trainer assignment",
242 race_id=race.id,
243 details={"count": missing_trainers},
244 )
245 )
247 if missing_barriers > 0:
248 issues.append(
249 ValidationIssue(
250 severity="warning",
251 category="missing_data",
252 message=f"{missing_barriers} starters missing barrier position",
253 race_id=race.id,
254 details={"count": missing_barriers},
255 )
256 )
258 return issues
260 def _detect_suspicious_results(
261 self, race: Race, starters: list[Starter]
262 ) -> list[ValidationIssue]:
263 """Detect suspicious or anomalous results.
265 Checks:
266 - Very few starters (< 3)
267 - Too many DNFs (> 50% of field)
268 - Unusual handicap values
270 Args:
271 race: Race instance
272 starters: List of starters
274 Returns:
275 List of validation issues
276 """
277 issues = []
279 # Check field size
280 if len(starters) < 3:
281 issues.append(
282 ValidationIssue(
283 severity="warning",
284 category="suspicious",
285 message=f"Very small field size: {len(starters)} starters",
286 race_id=race.id,
287 details={"starter_count": len(starters)},
288 )
289 )
291 # Check DNF rate
292 dnf_count = sum(1 for s in starters if s.did_not_finish)
293 if len(starters) > 0:
294 dnf_rate = dnf_count / len(starters)
295 if dnf_rate > 0.5:
296 issues.append(
297 ValidationIssue(
298 severity="warning",
299 category="suspicious",
300 message=f"High DNF rate: {dnf_rate:.1%} ({dnf_count}/{len(starters)})",
301 race_id=race.id,
302 details={"dnf_count": dnf_count, "dnf_rate": dnf_rate},
303 )
304 )
306 # Check for unusual handicaps
307 handicaps = [s.handicap_m for s in starters if s.handicap_m is not None]
308 if handicaps:
309 max_handicap = max(handicaps)
310 if max_handicap > 100: # > 100m back is very unusual
311 issues.append(
312 ValidationIssue(
313 severity="info",
314 category="suspicious",
315 message=f"Unusually large handicap: {max_handicap}m",
316 race_id=race.id,
317 details={"max_handicap": max_handicap},
318 )
319 )
321 return issues
323 def generate_report(self, from_date: date, to_date: date) -> DataQualityReport:
324 """Generate comprehensive data quality report for date range.
326 Args:
327 from_date: Start date (inclusive)
328 to_date: End date (inclusive)
330 Returns:
331 DataQualityReport with all issues and metrics
332 """
333 logger.info(f"Generating data quality report from {from_date} to {to_date}")
335 # Get all meetings in date range
336 meetings = (
337 self.session.query(Meeting)
338 .filter(
339 Meeting.meeting_date >= from_date,
340 Meeting.meeting_date <= to_date,
341 )
342 .all()
343 )
345 all_issues = []
346 total_races = 0
347 total_starters = 0
349 # Validate each race
350 for meeting in meetings:
351 for race in meeting.races:
352 total_races += 1
353 starters = race.starters
354 total_starters += len(starters)
356 race_issues = self.validate_race(race, starters)
357 all_issues.extend(race_issues)
359 # Compute additional metrics
360 metrics = self._compute_metrics(meetings, total_races, total_starters)
362 report = DataQualityReport(
363 start_date=from_date,
364 end_date=to_date,
365 total_meetings=len(meetings),
366 total_races=total_races,
367 total_starters=total_starters,
368 issues=all_issues,
369 metrics=metrics,
370 )
372 logger.info(
373 f"Data quality report complete: {report.error_count} errors, "
374 f"{report.warning_count} warnings across {total_races} races"
375 )
377 return report
379 def _compute_metrics(
380 self, meetings: list[Meeting], total_races: int, total_starters: int
381 ) -> dict:
382 """Compute data quality metrics.
384 Args:
385 meetings: List of meetings
386 total_races: Total race count
387 total_starters: Total starter count
389 Returns:
390 Dictionary of metrics
391 """
392 if not meetings or total_races == 0:
393 return {}
395 # Compute averages
396 avg_races_per_meeting = total_races / len(meetings) if meetings else 0
397 avg_starters_per_race = total_starters / total_races if total_races else 0
399 # Compute completeness metrics
400 total_with_driver = (
401 self.session.query(func.count(Starter.id))
402 .join(Starter.race)
403 .join(Race.meeting)
404 .filter(
405 Meeting.meeting_date >= meetings[0].meeting_date,
406 Meeting.meeting_date <= meetings[-1].meeting_date,
407 Starter.driver_id.isnot(None),
408 )
409 .scalar()
410 )
412 total_with_trainer = (
413 self.session.query(func.count(Starter.id))
414 .join(Starter.race)
415 .join(Race.meeting)
416 .filter(
417 Meeting.meeting_date >= meetings[0].meeting_date,
418 Meeting.meeting_date <= meetings[-1].meeting_date,
419 Starter.trainer_id.isnot(None),
420 )
421 .scalar()
422 )
424 driver_completeness = (
425 total_with_driver / total_starters if total_starters > 0 else 0
426 )
427 trainer_completeness = (
428 total_with_trainer / total_starters if total_starters > 0 else 0
429 )
431 return {
432 "avg_races_per_meeting": round(avg_races_per_meeting, 2),
433 "avg_starters_per_race": round(avg_starters_per_race, 2),
434 "driver_assignment_rate": round(driver_completeness, 3),
435 "trainer_assignment_rate": round(trainer_completeness, 3),
436 }
439def check_data_freshness(
440 session: Session, max_age_days: int = 7
441) -> ValidationIssue | None:
442 """Check if data is fresh (recent meetings exist).
444 Args:
445 session: Database session
446 max_age_days: Maximum acceptable age of most recent meeting
448 Returns:
449 ValidationIssue if data is stale, None otherwise
450 """
452 latest_meeting = (
453 session.query(Meeting).order_by(Meeting.meeting_date.desc()).first()
454 )
456 if not latest_meeting:
457 return ValidationIssue(
458 severity="error",
459 category="completeness",
460 message="No meetings found in database",
461 )
463 days_old = (date.today() - latest_meeting.meeting_date).days
465 if days_old > max_age_days:
466 return ValidationIssue(
467 severity="warning",
468 category="completeness",
469 message=f"Data may be stale: most recent meeting is {days_old} days old",
470 meeting_id=latest_meeting.id,
471 details={
472 "latest_meeting_date": latest_meeting.meeting_date.isoformat(),
473 "days_old": days_old,
474 },
475 )
477 return None