
    vi
                    b    S r SSKJr  SSKJr  SSKJr  SSKJr   " S S\5      r	 " S S	5      r
g
)zStreaming ingestion service for Kafka/Pub-Sub style data ingestion.

Provides an asynchronous pipeline for consuming racing data from
message brokers, enabling near-real-time rating updates.
    )annotations)Callable)StrEnum)Anyc                  (    \ rS rSrSrSrSrSrSrSr	g)	MessageSource   z$Supported streaming message sources.kafkaredis_pubsubrabbitmqwebhook N)
__name__
__module____qualname____firstlineno____doc__KAFKAREDIS_PUBSUBRABBITMQWEBHOOK__static_attributes__r       >/root/tipsharks/tipsharks-elo-api/packages/ingest/streaming.pyr   r      s    .E!LHGr   r   c                      \ rS rSrSr\R                  S4     SS jjr      SS jrSS jr	SS jr
SS jr\SS	 j5       rS
rg)StreamingIngestionService   zIngests racing data via streaming message brokers.

Supports consuming race results, runner updates, and meeting
information from Kafka, Redis Pub/Sub, or other brokers for
near-real-time processing.
Nc                `    [        U5      U l        U=(       d    0 U l        0 U l        SU l        g)zInitialize the streaming ingestion service.

Args:
    source: Message source type.
    config: Source-specific configuration dict (e.g.,
        bootstrap_servers, topic, group_id for Kafka).
FN)r   sourceconfig	_handlers_running)selfr   r    s      r   __init__"StreamingIngestionService.__init__   s)     $F+l46r   c                Z    U R                   R                  U/ 5      R                  U5        g)zRegister a handler function for a given topic.

Args:
    topic: Message topic/channel to subscribe to.
    handler: Callable that accepts a message payload dict.
N)r!   
setdefaultappend)r#   topichandlers      r   register_handler*StreamingIngestionService.register_handler0   s#     	!!%,33G<r   c                   #    [         e7f)zStart consuming messages from the configured source.

Connects to the broker and begins the main event loop,
dispatching messages to registered handlers.
NotImplementedErrorr#   s    r   startStreamingIngestionService.start=         "!   	c                   #    [         e7f)zzGracefully stop the ingestion service.

Closes broker connection and waits for in-flight messages
to complete processing.
r.   r0   s    r   stopStreamingIngestionService.stopE   r3   r4   c                   #    [         e7f)zPublish a message to the streaming source.

Useful for re-publishing processed events for downstream
consumers (e.g., rating updates).

Args:
    topic: Topic/channel to publish to.
    message: Message payload.
r.   )r#   r)   messages      r   publish!StreamingIngestionService.publishM   s      "!r4   c                    U R                   $ )z4Whether the service is currently consuming messages.)r"   r0   s    r   
is_running$StreamingIngestionService.is_runningY   s     }}r   )r!   r"   r    r   )r   zMessageSource | strr    zdict[str, Any] | NonereturnNone)r)   strr*   z Callable[[dict[str, Any]], None]r?   r@   )r?   r@   )r)   rA   r9   zdict[str, Any]r?   r@   )r?   bool)r   r   r   r   r   r   r   r$   r+   r1   r6   r:   propertyr=   r   r   r   r   r   r      sz     '4&9&9(,# & 
	"== 2= 
	=""
"  r   r   N)r   
__future__r   collections.abcr   enumr   typingr   r   r   r   r   r   <module>rH      s0    # $  G E Er   