import pandas as pd
import numpy as np
from datetime import timedelta
from ..utils.helper import to_datetime
from ..utils.constants import interpolation_map
[docs]
def generate_laps_table(bronze_lake):
df_exp = bronze_lake.get("TimingData")
sector_cols = {
"Sectors_0_Value": "sector1_time",
"Sectors_1_Value": "sector2_time",
"Sectors_2_Value": "sector3_time",
"Sectors_0_PreviousValue": None,
"Sectors_1_PreviousValue": None,
"Sectors_2_PreviousValue": None
}
speedTrap_cols = {
"Speeds_I1_Value": "speed_I1",
"Speeds_I2_Value": "speed_I2",
"Speeds_FL_Value": "speed_FL",
"Speeds_ST_Value": "speed_ST",
}
pit_cols = {
"InPit": "in_pit",
"PitOut": "pit_out"
}
base_cols = {
"NumberOfLaps": "lap_number",
"LastLapTime_Value": "lap_time"
}
extra_cols = ["no_pits"]
extra_raw_cols = ["Stopped"]
col_map = {**base_cols, **pit_cols, **sector_cols, **speedTrap_cols}
cols = list(base_cols.values()) + list(pit_cols.values()) + list(sector_cols.values()) + list(speedTrap_cols.values())
raw_cols = list(base_cols.keys()) + list(pit_cols.keys()) + list(sector_cols.keys()) + list(speedTrap_cols.keys()) + extra_raw_cols
def str_timedelta(x):
if isinstance(x, str):
count_sep = x.count(":")
if count_sep == 0:
return "00:00:" + x
elif count_sep == 1:
return "00:" + x
else:
return x
else:
return x
all_laps = []
for driver_no in df_exp["DriverNo"].unique():
df_driver = df_exp[df_exp["DriverNo"] == driver_no]
df_test = df_driver[["timestamp"] + raw_cols].dropna(subset=raw_cols, how="all").replace('', np.nan)
for col in ["Sectors_0_Value", "Sectors_1_Value", "Sectors_2_Value", "Sectors_0_PreviousValue", "Sectors_1_PreviousValue", "Sectors_2_PreviousValue", "LastLapTime_Value"]:
df_test[col] = df_test[col]
df_test[col] = pd.to_timedelta(df_test[col].apply(str_timedelta))
def enter_new_lap(laps, record):
if laps is None and record is None:
no_pits = 0
laps = []
record = {key: None if key != "lap_number" else 1 for key in cols}
record["no_pits"] = no_pits
return [], record, timedelta(seconds=0)
if (record["lap_time"] is None) & ((record["sector1_time"] != None) and (record["sector2_time"] != None) and (record["sector3_time"] != None)):
record["lap_time"] = record["sector1_time"] + record["sector2_time"] + record["sector3_time"]
laps.append(record)
no_pits = record["no_pits"]
record = {key: None if key != "lap_number" else val + 1 for key, val in record.items()}
record["no_pits"] = no_pits
return laps, record
new_lap_allowed = True
laps, record, last_record_ts = enter_new_lap(None, None)
for idx, row in df_test.iterrows():
ts = pd.to_timedelta(row.timestamp)
if row.Stopped == True:
laps, record = enter_new_lap(laps, record)
continue
if not pd.isnull(row.LastLapTime_Value):
if not pd.isnull(row.Sectors_2_Value):
record[col_map["LastLapTime_Value"]] = row.LastLapTime_Value
elif not pd.isnull(row.Sectors_2_PreviousValue):
laps[-1][col_map["LastLapTime_Value"]] = row.LastLapTime_Value
# for sc_key, sc_value in row[list(speedTrap_cols.keys())].dropna().to_dict().items():
# record[col_map[sc_key]] = sc_value
# for sc_key, sc_value in row[list(pit_cols.keys())].dropna().to_dict().items():
# if sc_key == "InPit":
# if sc_value == 1:
# record[col_map[sc_key]] = ts
# elif sc_key == "PitOut":
# if sc_value == True:
# record[col_map[sc_key]] = ts
# record["no_pits"] += 1
# for sc_key, sc_value in row[list(sector_cols.keys())].dropna().to_dict().items():
# sc_no = int(sc_key.split("_")[1])
# key_type = sc_key.split("_")[2]
# if key_type == "Value":
# if record[f"sector{str(sc_no + 1)}_time"] == None:
# record[f"sector{str(sc_no + 1)}_time"] = sc_value
# last_record_ts = ts
# if sc_no == 2:
# laps, record = enter_new_lap(laps, record)
# record["lap_start_time"] = ts
# elif sc_value == record[str(sc_no + 1)]:
# pass
# elif ts - last_record_ts > timedelta(seconds=10):
# laps, record = enter_new_lap(laps, record)
# record[f"sector{str(sc_no + 1)}_time"] = sc_value
# last_record_ts = ts
# elif key_type == "PreviousValue" and ts - last_record_ts > timedelta(seconds=10):
# record[f"sector{str(sc_no + 1)}_time"] = sc_value
# last_record_ts = ts
# if sc_no == 2:
# laps, record = enter_new_lap(laps, record)
for sc_key, sc_value in row.to_dict().items():
if not pd.isna(sc_value):
if sc_key in speedTrap_cols:
record[col_map[sc_key]] = sc_value
elif sc_key in pit_cols:
if sc_key == "InPit":
if sc_value == 1:
record[col_map[sc_key]] = ts
elif sc_key == "PitOut":
if sc_value == True:
record[col_map[sc_key]] = ts
record["no_pits"] += 1
elif sc_key in sector_cols:
sc_no = int(sc_key.split("_")[1])
key_type = sc_key.split("_")[2]
if key_type == "Value":
if record[f"sector{str(sc_no + 1)}_time"] == None:
record[f"sector{str(sc_no + 1)}_time"] = sc_value
last_record_ts = ts
if sc_no == 2:
laps, record = enter_new_lap(laps, record)
record["lap_start_time"] = ts
elif sc_value == record[str(sc_no + 1)]:
pass
elif ts - last_record_ts > timedelta(seconds=10):
laps, record = enter_new_lap(laps, record)
record[f"sector{str(sc_no + 1)}_time"] = sc_value
last_record_ts = ts
elif key_type == "PreviousValue" and ts - last_record_ts > timedelta(seconds=10):
record[f"sector{str(sc_no + 1)}_time"] = sc_value
last_record_ts = ts
if sc_no == 2:
laps, record = enter_new_lap(laps, record)
laps_df = pd.DataFrame(laps)
laps_df["DriverNo"] = driver_no
all_laps.append(laps_df)
all_laps_df = pd.concat(all_laps, ignore_index=True)
segments = ["sector1_time", "sector2_time", "sector3_time"]
for idx in range(len(segments)):
rest = np.delete(segments, idx)
all_laps_df[segments[idx]] = (all_laps_df[segments[idx]].fillna(timedelta(minutes=0)) + (all_laps_df[segments[idx]].isnull() & (all_laps_df["lap_number"] > 1)) * (all_laps_df[segments[idx]].isnull() * (all_laps_df["lap_time"].fillna(timedelta(minutes=0)) - all_laps_df[rest].sum(axis=1)))).replace(timedelta(minutes=0), np.timedelta64("NaT"))
new_ts = (all_laps_df["lap_start_time"] + all_laps_df["lap_time"]).shift(1)
all_laps_df["lap_start_time"] = (new_ts.isnull() * all_laps_df["lap_start_time"]) + new_ts.fillna(timedelta(0))
all_laps_df["lap_start_date"] = (all_laps_df["lap_start_time"] + bronze_lake.great_lake.session.first_datetime).fillna(bronze_lake.great_lake.session.session_start_datetime)
all_laps_df[["lap_start_time"] + all_laps_df.columns.tolist()]
return all_laps_df
[docs]
def generate_car_telemetry_table(bronze_lake):
session = bronze_lake.great_lake.session
df_pos = bronze_lake.get("Position.z")
df_pos["Utc"] = to_datetime(df_pos["Utc"])
df_pos["timestamp"] = pd.to_timedelta(df_pos["timestamp"])
df_car = bronze_lake.get("CarData.z")
df_car["Utc"] = to_datetime(df_car["Utc"])
df_car["timestamp"] = pd.to_timedelta(df_car["timestamp"])
df = df_car.set_index(["DriverNo", "Utc"]).join(df_pos.set_index(["DriverNo", "Utc"]), rsuffix="_pos", how="outer").reset_index().sort_values(["DriverNo", "Utc"])
all_drivers_data = []
for driver_no in df["DriverNo"].unique():
df_driver = df[df["DriverNo"] == driver_no].set_index("Utc")
laps = session.laps
laps_driver = laps[laps["DriverNo"] == driver_no]
for col in df_driver.columns:
if col in interpolation_map:
df_driver[col] = df_driver[col].interpolate(method=interpolation_map[col], order=2).values
# laps_driver["lap_end_date"] = laps_driver["lap_start_date"] + laps_driver["lap_time"] - timedelta(milliseconds=1)
# laps_driver = pd.concat([laps_driver[["lap_start_date", "lap_number"]].set_index("lap_start_date"), laps_driver[["lap_end_date", "lap_number"]].set_index("lap_end_date")]).reset_index().sort_values("index").dropna()
laps_driver.loc[:, "lap_end_date"] = laps_driver["lap_start_date"] + laps_driver["lap_time"]
# df_driver = df_driver.join(laps_driver.set_index("index"), how="outer")
df_driver = df_driver.join(laps_driver[["lap_start_date", "lap_number"]].set_index("lap_start_date"), how="outer")
df_driver["lap_number"] = df_driver["lap_number"].ffill().bfill()
df_driver.index.names = ['Utc']
df_driver = df_driver.reset_index()
# df_driver = df_driver[df_driver.Utc.between(laps_driver["index"].min(), laps_driver["index"].max())]
df_driver = df_driver[df_driver.Utc.between(laps_driver["lap_start_date"].min(), laps_driver["lap_end_date"].max())]
df_driver["SessionKey"] = df_driver["SessionKey"].ffill().bfill()
df_driver["timestamp"] = df_driver["Utc"] - session.first_datetime
all_drivers_data.append(df_driver)
all_drivers_df = pd.concat(all_drivers_data, ignore_index=True)
return all_drivers_df