Coverage for packages / ingest / streaming.py: 0%
23 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:14 +1200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 08:14 +1200
1"""Streaming ingestion service for Kafka/Pub-Sub style data ingestion.
3Provides an asynchronous pipeline for consuming racing data from
4message brokers, enabling near-real-time rating updates.
5"""
7from __future__ import annotations
9from collections.abc import Callable
10from enum import StrEnum
11from typing import Any
14class MessageSource(StrEnum):
15 """Supported streaming message sources."""
17 KAFKA = "kafka"
18 REDIS_PUBSUB = "redis_pubsub"
19 RABBITMQ = "rabbitmq"
20 WEBHOOK = "webhook"
23class StreamingIngestionService:
24 """Ingests racing data via streaming message brokers.
26 Supports consuming race results, runner updates, and meeting
27 information from Kafka, Redis Pub/Sub, or other brokers for
28 near-real-time processing.
29 """
31 def __init__(
32 self,
33 source: MessageSource | str = MessageSource.KAFKA,
34 config: dict[str, Any] | None = None,
35 ) -> None:
36 """Initialize the streaming ingestion service.
38 Args:
39 source: Message source type.
40 config: Source-specific configuration dict (e.g.,
41 bootstrap_servers, topic, group_id for Kafka).
42 """
43 self.source = MessageSource(source)
44 self.config = config or {}
45 self._handlers: dict[str, list[Callable]] = {}
46 self._running = False
48 def register_handler(
49 self,
50 topic: str,
51 handler: Callable[[dict[str, Any]], None],
52 ) -> None:
53 """Register a handler function for a given topic.
55 Args:
56 topic: Message topic/channel to subscribe to.
57 handler: Callable that accepts a message payload dict.
58 """
59 self._handlers.setdefault(topic, []).append(handler)
61 async def start(self) -> None:
62 """Start consuming messages from the configured source.
64 Connects to the broker and begins the main event loop,
65 dispatching messages to registered handlers.
66 """
67 raise NotImplementedError
69 async def stop(self) -> None:
70 """Gracefully stop the ingestion service.
72 Closes broker connection and waits for in-flight messages
73 to complete processing.
74 """
75 raise NotImplementedError
77 async def publish(self, topic: str, message: dict[str, Any]) -> None:
78 """Publish a message to the streaming source.
80 Useful for re-publishing processed events for downstream
81 consumers (e.g., rating updates).
83 Args:
84 topic: Topic/channel to publish to.
85 message: Message payload.
86 """
87 raise NotImplementedError
89 @property
90 def is_running(self) -> bool:
91 """Whether the service is currently consuming messages."""
92 return self._running