from ..utils.helper import *
from ..utils.constants import channel_name_map
[docs]
def parse_tyre_stint_series(data, sessionKey):
"""
Parses the tyre stint series data, generating records for each stint.
Parameters
----------
data : :class:`dict`
The tyre stint series data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver number, pit count, and other stint-related info.
"""
for key, value in data:
#data.items()::
for driver_no, stint in value["Stints"].items():
if stint:
for pit_count, current_info in stint.items():
record = {
"session_key": sessionKey,
"timestamp": key,
"DriverNo": driver_no,
"PitCount": pit_count,
**current_info
}
yield record
[docs]
def parse_driver_race_info(data, sessionKey):
"""
Parses driver race info data.
Parameters
----------
data : :class:`dict`
The driver race info data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver number, and other race-related info.
"""
for key, value in data:
#data.items()::
for driver_no, info in value.items():
record = {
"session_key": sessionKey,
"timestamp": key,
"DriverNo": driver_no,
**info
}
yield record
[docs]
def parse_current_tyres(data, sessionKey):
"""
Parses current tyre data for each driver.
Parameters
----------
data : :class:`dict`
The current tyre data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver number, and tyre-related info.
"""
for key, value in data:
#data.items()::
for driver_no, info in value["Tyres"].items():
record = {
"session_key": sessionKey,
"timestamp": key,
"DriverNo": driver_no,
**info
}
yield record
[docs]
def parse_driver_list(data, sessionKey):
"""
Parses the driver list data.
Parameters
----------
data : :class:`dict`
The driver list data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, driver number, and driver-related info.
"""
for driver_no, info in data:
#data.items()::
record = {
"session_key": sessionKey,
"DriverNo": driver_no,
**info
}
yield record
[docs]
def parse_session_data(data, sessionKey):
"""
Parses session data for each driver.
Parameters
----------
data : :class:`dict`
The session data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key and session-related info.
"""
for key, value in data:
#data.items()::
for driver_no, info in value.items():
try:
record = {
"session_key": sessionKey,
**list(info.values())[0]
}
yield record
except Exception as e:
pass
[docs]
def parse_timing_data(data, sessionKey):
"""
Parses timing data for each driver.
Parameters
----------
data : :class:`dict`
The timing data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver number, and various timing metrics.
"""
def parse_helper(info, record, prefix=""):
"""
Recursively parses nested dictionaries in the timing data.
"""
for info_k, info_v in info.items():
if isinstance(info_v, list):
record = {**record, **{f"{info_k}_{sector_no+1}_{k}": v for sector_no in range(len(info_v)) for k, v in info_v[sector_no].items()}}
elif isinstance(info_v, dict):
record = parse_helper(info_v, record, prefix=prefix + info_k + "_")
else:
record = {**record, **{prefix + info_k: info_v}}
return record
for ts, value in data:
#data.items()::
if "Withheld" in value.keys():
withTheId = value["Withheld"]
else:
withTheId = None
for driver_no, info in value["Lines"].items():
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"DriverNo": driver_no
}
record = parse_helper(info, record)
yield record
[docs]
def parse_lap_series(data, sessionKey):
"""
Parses lap series data for each driver.
Parameters
----------
data : :class:`dict`
The lap series data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver number, lap number, and lap position.
"""
for ts, ts_value in data:
#data.items()::
for driver_no, driver_data in ts_value.items():
if isinstance(driver_data["LapPosition"], list):
for position in driver_data["LapPosition"]:
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"DriverNo": driver_no,
"Lap": 0,
"LapPosition": position
}
yield record
elif isinstance(driver_data["LapPosition"], dict):
for lap, position in driver_data["LapPosition"].items():
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"DriverNo": driver_no,
"Lap": lap,
"LapPosition": position
}
yield record
[docs]
def parse_top_three(data, sessionKey):
"""
Parses the top three drivers' data.
Parameters
----------
data : :class:`dict`
The top three data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, driver position, and related info.
"""
for ts, ts_value in data:
#data.items()::
if "Withheld" in ts_value.keys():
continue
for position, info in ts_value["Lines"].items():
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"DriverAtPosition": position,
**info
}
yield record
[docs]
def parse_session_status(data, sessionKey):
"""
Parses the session status data.
Parameters
----------
data : :class:`dict`
The session status data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and session status.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"status": ts_value["Status"]
}
yield record
[docs]
def parse_hearthbeat(data, sessionKey):
"""
Parses the heartbeat data.
Parameters
----------
data : :class:`dict`
The heartbeat data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and UTC time.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"utc": ts_value["Utc"]
}
yield record
[docs]
def parse_weather_data(data, sessionKey):
"""
Parses weather data for the session.
Parameters
----------
data : :class:`dict`
The weather data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and weather-related information.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts,
**ts_value
}
yield record
[docs]
def parse_team_radio(data, sessionKey):
"""
Parses team radio data.
Parameters
----------
data : :class:`dict`
The team radio data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and captured radio messages.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts
}
if isinstance(ts_value["Captures"], list):
for capture in ts_value["Captures"]:
capture_record = {
**record,
**capture
}
yield capture_record
elif isinstance(ts_value["Captures"], dict):
for capture in ts_value["Captures"].values():
capture_record = {
**record,
**capture
}
yield capture_record
[docs]
def parse_tlarcm(data, sessionKey):
"""
Parses TLA RCM (Track Location Allocation Race Control Messages) data.
Parameters
----------
data : :class:`dict`
The TLA RCM data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and the message content.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"Message": ts_value["Message"]
}
yield record
[docs]
def parse_race_control_messages(data, sessionKey):
"""
Parses race control messages.
Parameters
----------
data : :class:`dict`
The race control messages data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and message details.
"""
for ts, ts_value in data:
#data.items()::
record = {
"SessionKey": sessionKey,
"timestamp": ts
}
if isinstance(ts_value["Messages"], list):
for capture in ts_value["Messages"]:
capture_record = {
**record,
**capture
}
yield capture_record
elif isinstance(ts_value["Messages"], dict):
for capture in ts_value["Messages"].values():
capture_record = {
**record,
**capture
}
yield capture_record
[docs]
def parse_session_info(data, sessionKey):
"""
Parses general session information.
Parameters
----------
data : :class:`dict`
The session information data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, and session-related information.
"""
for ts, value in data:
#data.items()::
if "Withheld" in value.keys():
withTheId = value["Withheld"]
else:
withTheId = None
record = {
"SessionKey": sessionKey,
"timestamp": ts
}
record = parse_helper_for_nested_dict(value, record)
yield record
[docs]
def parse_position_z(data, sessionKey):
"""
Parses driver position (z-axis) data.
Parameters
----------
data : :class:`dict`
The driver position data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, UTC time, driver number, and z-axis position data.
"""
for ts, v in data:
#data.items()::
parsed_entry = parse(v, zipped=True)
for position_entry in parsed_entry["Position"]:
utc = position_entry["Timestamp"]
for driver_entry in position_entry["Entries"].items():
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"Utc": utc,
"DriverNo": driver_entry[0],
**driver_entry[1]
}
yield record
[docs]
def parse_car_data_z(data, sessionKey):
"""
Parses car data (z-axis) for each driver.
Parameters
----------
data : :class:`dict`
The car data.
sessionKey : :class:`int`
The key of the current session.
Yields
----------
dict :
A record containing the session key, timestamp, UTC time, driver number, and channel data.
"""
if isinstance(data, list):
for ts, v in data:
#data.items()::
parsed_entry = parse(v, zipped=True)
for entry in parsed_entry["Entries"]:
utc = entry["Utc"]
for driver_entry in entry["Cars"].items():
ch = driver_entry[1]["Channels"]
record = {
"SessionKey": sessionKey,
"timestamp": ts,
"Utc": utc,
"DriverNo": driver_entry[0],
**{channel_name_map[k]:v for k,v in ch.items()}
}
yield record
elif isinstance(data, str):
parsed_entry = parse(data, zipped=True)
for entry in parsed_entry["Entries"]:
utc = entry["Utc"]
for driver_entry in entry["Cars"].items():
record = {
"SessionKey": sessionKey,
"timestamp": None,
"Utc": utc,
"DriverNo": driver_entry[0],
**{channel_name_map[k]:v for k,v in ch.items()}
}
yield record
[docs]
def parse_pit_lane_time(data, sessionKey):
for key, value in data:
#data.items()::
if "_deleted" in value["PitTimes"].keys():
for deleted_driver in value["PitTimes"]["_deleted"]:
record = {
"session_key": 0,
"timestamp": key,
"_deleted": deleted_driver
}
yield record
else:
for driver_no, info in value["PitTimes"].items():
record = {
"session_key": 0,
"timestamp": key,
**info
}
yield record
[docs]
def parse_basic(data, sessionKey):
for key, info in data:
#data.items()::
record = {
"session_key": 0,
"timestamp": key,
**info
}
yield record