Source code for livef1.adapters.realtime_client

import asyncio
import concurrent
import requests
import inspect
import random

import time
import json
from urllib.parse import urljoin

from ..data_processing.etl import function_map
from .signalr_aio._connection import Connection
from ..utils.logger import logger
from ..utils.constants import (
    DEFAULT_METHOD,
    BASE_URL,
    SIGNALR_ENDPOINT,
    REALTIME_CALLBACK_DEFAULT_PARAMETERS
    )
from ..utils.exceptions import (
    ArgumentError,
    ParsingError
)

[docs] class RealF1Client: """ A client for managing real-time Formula 1 data streaming. Attributes ---------- topics : list List of topics to subscribe to for receiving live data. headers : dict HTTP headers used for the connection. _connection_url : str URL for the SignalR connection. _log_file_name : str Path to the log file. _log_file_mode : str Mode for opening the log file. _test : bool Indicates if the client is in test mode. _log_file : file object Log file object for writing messages (used in test mode). _handlers : dict Mapping of methods to their respective handlers. Parameters ---------- topics : str or list Topic(s) to subscribe to for live updates. log_file_name : str, optional Name of the log file (default is None). log_file_mode : str, optional Mode for opening the log file (default is "w"). """ def __init__( self, topics, log_file_name = None, log_file_mode = "w", ): self._connection_url = urljoin(BASE_URL, SIGNALR_ENDPOINT) self.headers = { 'User-agent': 'BestHTTP', 'Accept-Encoding': 'gzip, identity', 'Connection': 'keep-alive, Upgrade'} if isinstance(topics, str): self.topics = [topics] elif isinstance(topics, list): self.topics = topics else: raise ArgumentError("You need to give list of topics you want to subscribe") self._log_file_name = log_file_name self._log_file_mode = log_file_mode self._handlers = {} if self._log_file_name: self._log_file = open(self._log_file_name, self._log_file_mode) @self.callback("default logger") async def print_callback( records ): for record in records: await self._file_logger(record) def _create_session(self): """ Create an HTTP session with the required headers. Returns ------- requests.Session A configured HTTP session. """ session = requests.Session() session.headers = self.headers return session async def _on_message(self, msg): """ Handle incoming messages asynchronously. Parameters ---------- msg : dict The incoming message to process. """ self._t_last_message = time.time() loop = asyncio.get_running_loop() try: with concurrent.futures.ThreadPoolExecutor() as pool: await loop.run_in_executor( pool, print, str(msg) ) except Exception as e: raise RealF1Error(e) async def _file_logger(self, msg): """ Log incoming messages to a file. Parameters ---------- msg : dict The incoming message to log. """ if msg != {} and msg: self._log_file.write(str(msg) + '\n') self._log_file.flush()
[docs] def on_message(self, method, handler): """ Register a handler for a specific method. Parameters ---------- method : str The method to handle. handler : callable The function to handle the method. """ if method not in self._handlers: func = MessageHandlerTemplate(handler).get self._handlers[method] = func
[docs] def callback(self, method): """ Decorator to register a callback function for a specific method. This decorator allows you to associate a callback function with a particular method. The function being registered must have arguments matching the required parameters defined in `REALTIME_CALLBACK_DEFAULT_PARAMETERS`. Raises ------ TypeError If the provided callback function does not have the required arguments. Notes ----- - The `REALTIME_CALLBACK_DEFAULT_PARAMETERS` is a predefined list of parameter names that the callback function must include. - Once the function is successfully registered, it will handle messages for the specified method. Examples -------- Registering a callback for a method: .. code-block:: python from livef1.adapters.realtime_client import RealF1Client client = RealF1Client( topics = ["CarData.z", "SessionInfo"], log_file_name="./output.json" ) @client.callback("new one") async def print_callback(records): # records argument have to be set print(records) # or you can do whatever you want with incoming data client.run() """ def inner(func): # Check if the provided function has the required arguments has_args = set(REALTIME_CALLBACK_DEFAULT_PARAMETERS) == set(inspect.signature(func).parameters.keys()) args_diff = set(REALTIME_CALLBACK_DEFAULT_PARAMETERS).difference(set(inspect.signature(func).parameters.keys())) if not has_args: raise ArgumentError(f"The provided callback function does not have following required arguments. {args_diff}") else: # Register the function as a handler for the given method self.on_message(method,func) logger.debug(f"Custom callback method with '{method}' has successfully inserted.") return func return inner
[docs] def run(self): """ Start the client in asynchronous mode. """ self._async_engine_run()
def _async_engine_run(self): """ Execute the asynchronous engine. """ try: asyncio.run(self._async_run()) except KeyboardInterrupt: logger.info("Keyboard interrupt - exiting...") async def _async_run(self): """ Run the client asynchronously. """ logger.info(f"Starting LiveF1 live timing client") await asyncio.gather( asyncio.ensure_future(self._forever_check()), asyncio.ensure_future(self._run()) ) logger.info("Exiting...") async def _forever_check(self): """ Keep the client running indefinitely. """ while True: await asyncio.sleep(1) def _sync_engine_run(self): pass def _sync_engine(self): pass async def _run(self): """ Set up the SignalR connection and register handlers. """ # Create connection self._connection = Connection(self._connection_url, session=self._create_session()) # Register hub hub = self._connection.register_hub('Streaming') # Set default message handler for method, handler in self._handlers.items(): hub.client.on(method, handler) # Subscribe topics in interest hub.server.invoke("Subscribe", self.topics) # Start the client loop = asyncio.get_event_loop() executor = concurrent.futures.ThreadPoolExecutor() await loop.run_in_executor(executor, self._connection.start)
[docs] class MessageHandlerTemplate: """ A template for handling incoming SignalR messages. This class serves as a message handler for SignalR streams, where incoming messages are processed and passed to a user-defined function. Parameters ---------- func : callable A user-defined asynchronous function that processes the parsed records from incoming SignalR messages. The function must accept the processed records as its input. """ def __init__( self, func ): self._func = func
[docs] async def get(self, msg): """ Process incoming messages and invoke the handler function. The method handles two types of incoming message formats: - Messages in the "R" key: Representing topics with associated data. - Messages in the "M" key: Representing method calls with data payloads. For each message, the method uses a `function_map` to parse the data and sends the resulting records to the user-defined handler function. Parameters ---------- msg : dict The incoming message in SignalR format. It is expected to contain either: - "R": A dictionary where the keys represent topic names and the values are the associated data for that topic. - "M": A list of dictionaries, where each dictionary contains: - "M": The method name. - "A": A list of message components including topic name, data, and timestamp. """ batch = msg if ("R" in batch.keys()) or (batch.get("M") and batch.get("M") != []): if batch.get("R"): for key in batch.get("R").keys(): try: topic_name = key data = batch.get("R")[key] timestamp = None records = list(function_map[topic_name]({timestamp: data}, None)) await self._func(records) # await self._func( # topic_name = key, # data = batch.get("R")[key], # timestamp = None) except Exception as e: raise ParsingError(e) elif batch.get("M"): for data in batch.get("M"): method = data["M"] message = data["A"] topic_name = message[0] data = message[1] timestamp = message[2] records = list(function_map[topic_name]({timestamp: data}, None)) await self._func(records)
# await self._func( # topic_name = message[0], # data = message[1], # timestamp = message[2])