Coverage for packages / ingest / streaming.py: 0%

23 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 08:14 +1200

1"""Streaming ingestion service for Kafka/Pub-Sub style data ingestion. 

2 

3Provides an asynchronous pipeline for consuming racing data from 

4message brokers, enabling near-real-time rating updates. 

5""" 

6 

7from __future__ import annotations 

8 

9from collections.abc import Callable 

10from enum import StrEnum 

11from typing import Any 

12 

13 

14class MessageSource(StrEnum): 

15 """Supported streaming message sources.""" 

16 

17 KAFKA = "kafka" 

18 REDIS_PUBSUB = "redis_pubsub" 

19 RABBITMQ = "rabbitmq" 

20 WEBHOOK = "webhook" 

21 

22 

23class StreamingIngestionService: 

24 """Ingests racing data via streaming message brokers. 

25 

26 Supports consuming race results, runner updates, and meeting 

27 information from Kafka, Redis Pub/Sub, or other brokers for 

28 near-real-time processing. 

29 """ 

30 

31 def __init__( 

32 self, 

33 source: MessageSource | str = MessageSource.KAFKA, 

34 config: dict[str, Any] | None = None, 

35 ) -> None: 

36 """Initialize the streaming ingestion service. 

37 

38 Args: 

39 source: Message source type. 

40 config: Source-specific configuration dict (e.g., 

41 bootstrap_servers, topic, group_id for Kafka). 

42 """ 

43 self.source = MessageSource(source) 

44 self.config = config or {} 

45 self._handlers: dict[str, list[Callable]] = {} 

46 self._running = False 

47 

48 def register_handler( 

49 self, 

50 topic: str, 

51 handler: Callable[[dict[str, Any]], None], 

52 ) -> None: 

53 """Register a handler function for a given topic. 

54 

55 Args: 

56 topic: Message topic/channel to subscribe to. 

57 handler: Callable that accepts a message payload dict. 

58 """ 

59 self._handlers.setdefault(topic, []).append(handler) 

60 

61 async def start(self) -> None: 

62 """Start consuming messages from the configured source. 

63 

64 Connects to the broker and begins the main event loop, 

65 dispatching messages to registered handlers. 

66 """ 

67 raise NotImplementedError 

68 

69 async def stop(self) -> None: 

70 """Gracefully stop the ingestion service. 

71 

72 Closes broker connection and waits for in-flight messages 

73 to complete processing. 

74 """ 

75 raise NotImplementedError 

76 

77 async def publish(self, topic: str, message: dict[str, Any]) -> None: 

78 """Publish a message to the streaming source. 

79 

80 Useful for re-publishing processed events for downstream 

81 consumers (e.g., rating updates). 

82 

83 Args: 

84 topic: Topic/channel to publish to. 

85 message: Message payload. 

86 """ 

87 raise NotImplementedError 

88 

89 @property 

90 def is_running(self) -> bool: 

91 """Whether the service is currently consuming messages.""" 

92 return self._running