Source code for livef1.models.session

# Standard Library Imports
from urllib.parse import urljoin
from typing import List, Dict
from time import time

# Third-Party Library Imports
# (No third-party libraries imported in this file)

# Internal Project Imports
from ..adapters import livetimingF1_request, livetimingF1_getdata
from ..utils import helper
from ..utils.logger import logger
from ..data_processing.etl import *
from ..data_processing.data_models import *
from ..utils.constants import TOPICS_MAP, SILVER_SESSION_TABLES, TABLE_GENERATION_FUNCTIONS
from ..data_processing.lakes import DataLake

from multiprocessing import Pool
from functools import partial
import multiprocessing
from itertools import repeat


[docs] class Session: """ Represents a Formula 1 session, containing methods to retrieve live timing data and process it. Attributes ---------- season : :class:`~Season` The season the session belongs to. year : :class:`int` The year of the session. meeting : :class:`~Meeting` The meeting the session is part of. key : :class:`int` Unique identifier for the session. name : :class:`str` Name of the session. type : :class:`str` Type of the session (e.g., practice, qualifying, race). number : :class:`int` The session number. startdate : :class:`str` Start date and time of the session. enddate : :class:`str` End date and time of the session. gmtoffset : :class:`str` GMT offset for the session's timing. path : :class:`dict` Path information for accessing session data. loaded : :class:`bool` Indicates whether the session data has been loaded. """ def __init__( self, season: "Season" = None, year: int = None, meeting: "Meeting" = None, key: int = None, name: str = None, type: str = None, number: int = None, startdate: str = None, enddate: str = None, gmtoffset: str = None, path: Dict = None, loaded: bool = False, **kwargs ): self.season = season self.loaded = loaded self.data_lake = DataLake(self) self.etl_parser = livef1SessionETL(session=self) # Create an ETL parser for the session. # Silver Data for attr in SILVER_SESSION_TABLES: setattr(self, attr, None) # Iterate over the kwargs and set them as attributes of the instance for key, value in locals().items(): if value: setattr(self, key.lower(), value) # Set instance attributes based on provided parameters. # Build the full path for accessing session data if path attribute exists. if hasattr(self, "path"): self.full_path = helper.build_session_endpoint(self.path)
[docs] def get_topic_names(self): """ Retrieve information about available data topics for the session. This method fetches details about the available data topics for the session from the live timing feed and enriches the data with descriptions and keys from a predefined `TOPICS_MAP`. Returns ------- :class:`dict` A dictionary containing information about available data topics. Each key represents a topic, and its value is another dictionary with the following keys: - `description` (str): A description of the topic. - `key` (str): A unique key identifying the topic. - Other metadata provided by the live timing feed. Notes ----- - The data is fetched from a URL formed by appending `"Index.json"` to the session's `full_path`. - The fetched data is enriched with additional information from the `TOPICS_MAP` dictionary. - The `topic_names_info` attribute is set to the resulting dictionary for later use. Examples ------------- The returned dictionary would be: .. code-block:: json { "Topic1": { "KeyFramePath": "Topic1.json", "StreamPath": "Topic1.jsonStream" "description": "Description for Topic1", "key": "T1" }, "Topic2": { "KeyFramePath": "Topic2.json", "StreamPath": "Topic2.jsonStream" "description": "Description for Topic2", "key": "T2" } } """ logger.debug(f"Getting topic names for the session: {self.meeting.name}: {self.name}") self.topic_names_info = livetimingF1_request(urljoin(self.full_path, "Index.json"))["Feeds"] for topic in self.topic_names_info: self.topic_names_info[topic]["description"] = TOPICS_MAP[topic]["description"] self.topic_names_info[topic]["key"] = TOPICS_MAP[topic]["key"] return self.topic_names_info
[docs] def print_topic_names(self): """ Print the topic names and their descriptions. This method prints the key and description for each topic available in the `topic_names_info` attribute. If the `topic_names_info` attribute is not already populated, it fetches the data using the `get_topic_names` method. Notes ----- - The method assumes the `topic_names_info` attribute is a dictionary where each key represents a topic, and its value is another dictionary containing `key` and `description`. - The `get_topic_names` method is called if `topic_names_info` is not already populated. Examples ------------- The output would be: .. code-block:: plain T1 : Description for topic 1 T2 : Description for topic 2 """ if not hasattr(self, "topic_names_info"): self.get_topic_names() logger.debug(f"Printing topic names and descriptions for the session: {self.meeting.name}: {self.name}") for topic in self.topic_names_info: print(self.topic_names_info[topic]["key"], ": \n\t", self.topic_names_info[topic]["description"])
[docs] def load_data( self, dataNames, parallel: bool = False, dataType: str = "StreamPath", stream: bool = True ): """ Retrieve and parse data from feeds, either sequentially or in parallel. Parameters ---------- dataNames : Union[str, List[str]] Single data name or list of data names to retrieve parallel : bool, optional Whether to load data in parallel (True) or sequentially (False), by default True dataType : str, optional The type of the data to fetch, by default "StreamPath" stream : bool, optional Whether to fetch as stream, by default True Returns ------- Union[BasicResult, dict] If single data name provided: BasicResult object with parsed data If multiple data names: Dictionary mapping names to BasicResult objects Notes ----- - For parallel loading, uses multiprocessing Pool with (CPU count - 1) processes - Saves all loaded data to bronze lake before returning - Returns same format as input: single result for str input, dict for list input """ # Ensure topic names are loaded if not hasattr(self, "topic_names_info"): self.get_topic_names() # Handle single data name case single_input = isinstance(dataNames, str) if single_input: dataNames = [dataNames] # Validate all data names validated_names = [] for name in dataNames: for topic in self.topic_names_info: if self.topic_names_info[topic]["key"] == name: name = topic break validated_names.append(name) results = {} if parallel and len(validated_names) > 1: # Parallel loading n_processes = max(1, multiprocessing.cpu_count() - 1) with Pool(processes=n_processes) as pool: loaded_results = pool.starmap(load_single_data, zip(validated_names, repeat(self))) results = {name: result for name, result in loaded_results} else: # Sequential loading for name in validated_names: name, res = load_single_data(name, self) # logger.info(f"Fetching data : '{name}'") # start = time() # data = livetimingF1_getdata( # urljoin(self.full_path, self.topic_names_info[name][dataType]), # stream=stream # ) # logger.debug(f"Fetched in {round(time() - start,3)} seconds") # start = time() # res = BasicResult( # data=list(self.etl_parser.unified_parse(name, data)) # ) # logger.debug(f"Parsed in {round(time() - start,3)} seconds") # logger.info(f"'{name}' has been fetched and parsed") results[name] = res # Save all results to bronze lake for name, result in results.items(): self.data_lake.put( level="bronze", data_name=name, data=result ) logger.debug(f"'{name}' has been saved to the bronze lake.") # Return single result or dict based on input type if single_input: return self.data_lake.get(level="bronze", data_name=validated_names[0]) return {name: self.data_lake.get(level="bronze", data_name=name) for name in validated_names}
[docs] def get_data( self, dataNames, parallel: bool = False, force: bool = False ): """ Retrieve one or multiple data topics from cache or load them, with optional parallel processing. Parameters ---------- data_names : Union[str, List[str]] Single data topic name or list of data topic names to retrieve parallel : bool, optional Whether to use parallel processing when fetching multiple topics. Defaults to False. force : bool, optional Whether to force download data even if it exists in cache. Defaults to False. Returns ------- Union[BasicResult, Dict[str, BasicResult]] If a single topic is requested, returns its BasicResult object. If multiple topics are requested, returns a dictionary mapping topic names to their BasicResult objects. Examples -------- # Get single topic >>> telemetry = session.get_data("CarData.z") # Get multiple topics in parallel (default) >>> data = session.get_data(["CarData.z", "Position.z", "SessionStatus"]) # Get multiple topics sequentially >>> data = session.get_data(["CarData.z", "Position.z"], parallel=False) # Force download data even if cached >>> data = session.get_data("CarData.z", force=True) Notes ----- - Automatically handles both single and multiple data requests - Checks cache (data lake) before loading new data unless force=True - Uses parallel processing for multiple topics when parallel=True - Returns same format as input: single result for str input, dict for list input """ # Ensure topic names are loaded if not hasattr(self, "topic_names_info"): self.get_topic_names() # Handle single data name case single_input = isinstance(dataNames, str) dataNames = [dataNames] if single_input else dataNames # Validate all data names validated_names = [self.check_data_name(name) for name in dataNames] # Check cache and identify topics to load to_load = [] results = {} for name in validated_names: if not force and name in self.data_lake.raw: logger.debug(f"'{name}' found in lake, using cached version") results[name] = self.data_lake.get(level="bronze", data_name=name) else: to_load.append(name) if to_load: # Load new data using load_data with parallel option loaded_results = self.load_data( dataNames=to_load, parallel=parallel and len(to_load) > 1 ) if isinstance(loaded_results, dict): results.update(loaded_results) else: # Handle single result case results[to_load[0]] = loaded_results # Return single result if single input, otherwise return dictionary return results[validated_names[0]] if single_input else results
[docs] def check_data_name(self, dataName: str): """ Validate and return the correct data name. This method checks if the provided data name exists in the `topic_names_info` attribute. If it does, it returns the corresponding topic name. Parameters ---------- dataName : :class:`str` The name of the data topic to validate. Returns ------- :class:`str` The validated data name. Notes ----- - The method ensures that the provided data name exists in the `topic_names_info` attribute. - If the data name is found, it returns the corresponding topic name. """ if not hasattr(self,"topic_names_info"): self.get_topic_names() for topic in self.topic_names_info: if self.topic_names_info[topic]["key"] == dataName: dataName = topic break return dataName
[docs] def get_laps(self): """ Retrieve the laps data. This method returns the laps data if it has been generated. If not, it logs an informational message indicating that the laps table is not generated yet. Returns ------- :class:`~Laps` or None The laps data if available, otherwise None. Notes ----- - The method checks if the `laps` attribute is populated. - If the `laps` attribute is not populated, it logs an informational message. """ if self.laps is not None: return self.laps else: logger.info("Laps table is not generated yet. Use .generate() to load required data and generate silver tables.") return None
[docs] def get_car_telemetry(self): """ Retrieve the car telemetry data. This method returns the car telemetry data if it has been generated. If not, it logs an informational message indicating that the car telemetry table is not generated yet. Returns ------- :class:`~CarTelemetry` or None The car telemetry data if available, otherwise None. Notes ----- - The method checks if the `carTelemetry` attribute is populated. - If the `carTelemetry` attribute is not populated, it logs an informational message. """ if self.carTelemetry is not None: return self.carTelemetry else: logger.info("Car Telemetry table is not generated yet. Use .generate() to load required data and generate silver tables.") return None
[docs] def get_weather(self): """ Retrieve the weather data. This method returns the weather data if it has been generated. If not, it logs an informational message indicating that the weather table is not generated yet. Returns ------- :class:`~Weather` or None The weather data if available, otherwise None. Notes ----- - The method checks if the `weather` attribute is populated. - If the `weather` attribute is not populated, it logs an informational message. """ logger.error(".get_weather() is not implemented yet.")
# if self.weather != None: return self.weather # else: # logger.info("Weather table is not generated yet. Use .generate() to load required data and generate silver tables.") # return None
[docs] def get_timing(self): """ Retrieve the timing data. This method returns the timing data if it has been generated. If not, it logs an informational message indicating that the timing table is not generated yet. Returns ------- :class:`~Timing` or None The timing data if available, otherwise None. Notes ----- - The method checks if the `timing` attribute is populated. - If the `timing` attribute is not populated, it logs an informational message. """ logger.error(".get_timing() is not implemented yet.")
# if self.timing != None: return self.timing # else: # logger.info("Timing table is not generated yet. Use .generate() to load required data and generate silver tables.") # return None def _get_first_datetime(self): pos_df = self.get_data("Position.z") car_df = self.get_data("CarData.z") first_date = np.amax([(helper.to_datetime(car_df["Utc"]) - pd.to_timedelta(car_df["timestamp"])).max(), (helper.to_datetime(pos_df["Utc"]) - pd.to_timedelta(pos_df["timestamp"])).max()]) return first_date def _get_session_start_time(self): return pd.to_timedelta(self.get_data(dataNames="SessionStatus").set_index("status").loc["Started"].timestamp)
[docs] def generate(self, silver=True, gold=False): required_data = set(["CarData.z", "Position.z", "SessionStatus"]) tables_to_generate = set() if silver: tables_to_generate.update(SILVER_SESSION_TABLES) if gold: tables_to_generate.update(GOLD_SESSION_TABLES) for table_name in tables_to_generate: required_data.update(TABLE_REQUIREMENTS[table_name]) # Use the unified get_data method instead of get_data_parallel self.get_data(list(required_data), parallel=True) self.first_datetime = self._get_first_datetime() self.session_start_time = self._get_session_start_time() self.session_start_datetime = self.first_datetime + self.session_start_time if silver: logger.info(f"Silver tables are being generated.") for table_name in SILVER_SESSION_TABLES: if table_name in TABLE_GENERATION_FUNCTIONS: setattr(self, table_name, self.data_lake.silver_lake.generate_table(table_name)) logger.info(f"'{table_name}' has been generated and saved to the silver lake. You can access it from 'session.{table_name}'.") if gold: logger.info("Gold tables are not implemented yet.") pass
[docs] def generate_laps_table(self): setattr(self, "laps", self.data_lake.silver_lake.generate_table("laps"))
[docs] def generate_car_telemetry_table(self): setattr(self, "car_telemetry", self.data_lake.silver_lake.generate_table("laps"))
def load_single_data(dataName, session): dataType = "StreamPath" stream = True logger.info(f"Fetching data : '{dataName}'") start = time() data = livetimingF1_getdata( urljoin(session.full_path, session.topic_names_info[dataName][dataType]), stream=stream ) logger.debug(f"Fetched in {round(time() - start,3)} seconds") # Parse the retrieved data using the ETL parser and return the result. start = time() res = BasicResult( data=list(session.etl_parser.unified_parse(dataName, data)) ) logger.debug(f"Parsed in {round(time() - start,3)} seconds") logger.info(f"'{dataName}' has been fetched and parsed") # session.data_lake.put( # level="bronze", data_name=dataName, data=res # ) # logger.debug(f"'{dataName}' has been saved to the bronze lake.") return dataName, res # session.load() # session.generate(silver=True, gold=False) # session.load( # bronze=True, # silver=True, # gold=True # ) # session.telemetry # session.timing # session.weather # session.position # telemetry # coordinates # tyre # stint # position # laps # pitduration # pitstops # timings # bronzeLake # silverLake # goldLake