Skip to content

GPS

GPS

GPS(path: Path, logpath: Path | None = None)

GPS sensor for reading UBX binary data.

Attributes:

Name Type Description
data_path Path

Path to GPS binary file (*.gps.bin).

logpath Path

Path to log file for timestamp extraction.

tstart Optional[datetime]

Start timestamp from log file (set after load_data).

data Optional[DataFrame]

Polars DataFrame with GPS data (None until load_data is called).

Initialize GPS sensor.

Parameters:

Name Type Description Default
path Path

Directory containing GPS binary file.

required
logpath Optional[Path]

Optional path to log file. If None, will be inferred.

None
Source code in pils/sensors/gps.py
def __init__(self, path: Path, logpath: Path | None = None) -> None:
    """
    Initialize GPS sensor.

    Parameters
    ----------
    path : Path
        Directory containing GPS binary file.
    logpath : Optional[Path], default=None
        Optional path to log file. If None, will be inferred.
    """

    files = list(path.glob("*"))

    for f in files:
        if f.name.lower().endswith("gps.bin"):
            self.data_path = f
    if logpath is not None:
        self.logpath = logpath
    else:
        self.logpath = get_logpath_from_datapath(self.data_path)

    self.tstart: datetime | None = None
    self.data: pl.DataFrame | None = None

load_data

load_data(freq_interpolation: float | None = None) -> None

Load GPS data from UBX binary file.

Reads GPS data in UBX protocol format and parses NAV messages. Merges different NAV message types (POSLLH, VELNED, STATUS, etc.) onto a common time grid with optional interpolation.

Parameters:

Name Type Description Default
freq_interpolation Optional[float]

Optional frequency for interpolation in Hz. If None, uses mean time difference from data. If provided, resamples data to this frequency.

None
Notes

Sets self.data: Polars DataFrame with: - unix_time_ms: Unix timestamp in milliseconds - datetime: UTC datetime - timestamp: Unix timestamp in seconds - NAV message columns (prefixed by message type)

Requires log file with "Sensor ZED-F9P started" entry for date extraction.

Source code in pils/sensors/gps.py
def load_data(self, freq_interpolation: float | None = None) -> None:
    """
    Load GPS data from UBX binary file.

    Reads GPS data in UBX protocol format and parses NAV messages.
    Merges different NAV message types (POSLLH, VELNED, STATUS, etc.)
    onto a common time grid with optional interpolation.

    Parameters
    ----------
    freq_interpolation : Optional[float], default=None
        Optional frequency for interpolation in Hz.
        If None, uses mean time difference from data.
        If provided, resamples data to this frequency.

    Notes
    -----
    Sets self.data: Polars DataFrame with:
    - unix_time_ms: Unix timestamp in milliseconds
    - datetime: UTC datetime
    - timestamp: Unix timestamp in seconds
    - NAV message columns (prefixed by message type)

    Requires log file with "Sensor ZED-F9P started" entry for date extraction.
    """

    # Dictionary to collect records from different NAV message types
    nav_records = {}

    with open(self.data_path, "rb") as stream:
        ubr = UBXReader(stream, protfilter=UBX_PROTOCOL, quitonerror=False)
        for _raw_data, parsed_data in ubr:
            if parsed_data is None:
                continue

            # Only process NAV messages
            if not parsed_data.identity.startswith("NAV-"):
                continue

            msg_type = parsed_data.identity

            # Initialize list for this message type if not exists
            if msg_type not in nav_records:
                nav_records[msg_type] = []

            # Extract all attributes from the parsed message
            record = {}
            for attr in dir(parsed_data):
                if not attr.startswith("_") and attr not in [
                    "identity",
                    "payload",
                    "msg_cls",
                    "msg_id",
                    "length",
                ]:
                    try:
                        value = getattr(parsed_data, attr)
                        # Only include serializable values (not methods)
                        if not callable(value):
                            record[attr] = value
                    except Exception:
                        pass

            nav_records[msg_type].append(record)

    # Convert each message type to a DataFrame with prefixed column names
    nav_dataframes = {}
    for msg_type, records in nav_records.items():
        if records:
            df = pl.DataFrame(records)
            # Prefix columns with message type (except iTOW which is used for joining)
            prefix = msg_type.replace("NAV-", "").lower()
            renamed_cols = {}
            for col in df.columns:
                if col != "iTOW":
                    renamed_cols[col] = f"{prefix}_{col}"
            if renamed_cols:
                df = df.rename(renamed_cols)
            nav_dataframes[msg_type] = df

    # Get the date from log file to compute GPS week
    time_start, date = read_log_time(
        keyphrase="Sensor ZED-F9P started", logfile=self.logpath
    )

    if date is None:
        self.data = pl.DataFrame()
        return

    # GPS epoch and leap seconds offset (GPS time is ahead of UTC by 18 seconds as of 2017)
    gps_epoch = datetime(1980, 1, 6).date()
    unix_epoch = datetime(1970, 1, 1).date()
    gps_leap_seconds = 18  # GPS-UTC offset in seconds

    duration = date - gps_epoch
    gps_week = duration.days // 7

    # Compute Unix timestamp in ms for each dataframe
    # GPS time = GPS epoch + (week * 7 days) + iTOW (in ms)
    # Unix time = GPS time - (GPS epoch - Unix epoch) - leap_seconds
    gps_to_unix_offset_ms = (gps_epoch - unix_epoch).days * 24 * 60 * 60 * 1000
    leap_seconds_ms = gps_leap_seconds * 1000
    week_ms = gps_week * 7 * 24 * 60 * 60 * 1000

    for msg_type, df in nav_dataframes.items():
        if "iTOW" in df.columns:
            first_itow = df["iTOW"][0]
            # Create Unix timestamp in ms
            # unix_time_ms = gps_week_ms + iTOW + gps_epoch_offset - leap_seconds
            df = df.with_columns(
                [
                    (
                        pl.col("iTOW")
                        + pl.lit(week_ms + gps_to_unix_offset_ms - leap_seconds_ms)
                    ).alias("unix_time_ms"),
                    # Relative datetime: time_start + (iTOW - iTOW[0])
                    (
                        pl.lit(time_start)
                        + pl.duration(milliseconds=pl.col("iTOW") - first_itow)
                    ).alias("datetime_relative"),
                ]
            )
            nav_dataframes[msg_type] = df

    # Combine dataframes based on a common time grid with interpolation
    gps_data = self._merge_nav_dataframes(nav_dataframes, freq=freq_interpolation)

    if gps_data is None or len(gps_data) == 0:
        gps_data = pl.DataFrame()
        self.data = gps_data
        return

    # Convert unix_time_ms to datetime (UTC)
    gps_data = gps_data.with_columns(
        (pl.from_epoch(pl.col("unix_time_ms"), time_unit="ms")).alias("datetime")
    )
    gps_data = gps_data.with_columns(
        (pl.col("unix_time_ms") / 1000.0).alias("timestamp")
    )
    gps_data = gps_data.with_columns(pl.col("posllh_height") / 1000.0)

    self.data = gps_data