Skip to content

PPK Analysis

PPKAnalysis

PPKAnalysis(flight: Flight)

Standalone PPK analysis manager with smart execution and versioning.

Manages RTKLIB-based post-processing of GPS data with intelligent re-run logic, automatic versioning, and HDF5 persistence. Completely separate from Flight class.

The analysis system: - Only runs RTKLIB when config changes (SHA256 hash comparison) - Stores each revision in its own folder with .pos, .stat, .conf files - Saves all versions to a single ppk_solution.h5 HDF5 file - Generates auto-timestamp version names

File Structure

flight_dir/proc/ppk/ ├── ppk_solution.h5 # HDF5 with all versions └── rev_20260204_143022/ # Per-revision folder ├── solution.pos # RTKLIB position output ├── solution.pos.stat # RTKLIB statistics output └── config.conf # RTKLIB config used

Attributes:

Name Type Description
flight_path Path

Root flight directory

ppk_dir Path

PPK directory ({flight_path}/proc/ppk/)

hdf5_path Path

Path to ppk_solution.h5 file

versions Dict[str, PPKVersion]

Dictionary of loaded PPK versions

Examples:

>>> from pils.flight import Flight
>>> # Create Flight object
>>> flight_info = {
...     "drone_data_folder_path": "/path/to/flight/drone",
...     "aux_data_folder_path": "/path/to/flight/aux"
... }
>>> flight = Flight(flight_info)
>>> # Create new analysis
>>> ppk = PPKAnalysis(flight)
>>> version = ppk.run_analysis('rtklib.conf')  # Smart execution
>>> # Only runs if config changed
>>> version2 = ppk.run_analysis('rtklib.conf')  # Skipped if same
>>> # Force re-run
>>> version3 = ppk.run_analysis('rtklib.conf', force=True)
>>> # Access versions
>>> latest = ppk.get_latest_version()
>>> all_versions = ppk.list_versions()

Initialize PPKAnalysis for a flight.

Creates the proc/ppk directory structure if it doesn't exist.

Parameters:

Name Type Description Default
flight Flight

Flight object with valid flight_path attribute

required

Raises:

Type Description
TypeError

If flight is not a Flight object

ValueError

If flight.flight_path is None or not an existing directory

Examples:

>>> from pils.flight import Flight
>>> flight_info = {
...     "drone_data_folder_path": "/path/to/flight/drone",
...     "aux_data_folder_path": "/path/to/flight/aux"
... }
>>> flight = Flight(flight_info)
>>> ppk = PPKAnalysis(flight)
>>> print(ppk.ppk_dir)
/path/to/flight/proc/ppk
Source code in pils/analyze/ppk.py
def __init__(self, flight: Flight):
    """
    Initialize PPKAnalysis for a flight.

    Creates the proc/ppk directory structure if it doesn't exist.

    Parameters
    ----------
    flight : Flight
        Flight object with valid flight_path attribute

    Raises
    ------
    TypeError
        If flight is not a Flight object
    ValueError
        If flight.flight_path is None or not an existing directory

    Examples
    --------
    >>> from pils.flight import Flight
    >>> flight_info = {
    ...     "drone_data_folder_path": "/path/to/flight/drone",
    ...     "aux_data_folder_path": "/path/to/flight/aux"
    ... }
    >>> flight = Flight(flight_info)
    >>> ppk = PPKAnalysis(flight)
    >>> print(ppk.ppk_dir)
    /path/to/flight/proc/ppk
    """

    # Validate input type
    if not isinstance(flight, Flight):
        raise TypeError(
            f"Expected Flight object, got {type(flight).__name__}. "
            "PPKAnalysis now requires a Flight object instead of a path."
        )

    # Validate flight_path exists
    if flight.flight_path is None:
        raise ValueError(
            "Flight object must have a valid flight_path attribute. "
            "Ensure the Flight was initialized with proper flight_info."
        )

    # Convert to Path and validate it's a directory
    flight_path = Path(flight.flight_path)
    if not flight_path.exists() or not flight_path.is_dir():
        raise ValueError(
            f"flight_path must be an existing directory. Got: {flight_path}"
        )

    self.flight_path = flight_path
    self.ppk_dir = self.flight_path / "proc" / "ppk"
    self.hdf5_path = self.ppk_dir / "ppk_solution.h5"
    self.versions: dict[str, PPKVersion] = {}

    # Create PPK directory structure
    self.ppk_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Initialized PPKAnalysis for flight: {self.flight_path}")
    logger.info(f"PPK directory: {self.ppk_dir}")

get_latest_version

get_latest_version() -> PPKVersion | None

Get the most recent PPK version.

Returns the version with the latest timestamp based on version name sorting (rev_YYYYMMDD_HHMMSS format sorts chronologically).

Returns:

Type Description
Optional[PPKVersion]

Latest PPKVersion or None if no versions exist

Examples:

>>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
>>> latest = ppk.get_latest_version()
>>> if latest:
...     print(f"Latest: {latest.version_name}")
...     print(latest.pos_data)
Source code in pils/analyze/ppk.py
def get_latest_version(self) -> PPKVersion | None:
    """
    Get the most recent PPK version.

    Returns the version with the latest timestamp based on
    version name sorting (rev_YYYYMMDD_HHMMSS format sorts chronologically).

    Returns
    -------
    Optional[PPKVersion]
        Latest PPKVersion or None if no versions exist

    Examples
    --------
    >>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
    >>> latest = ppk.get_latest_version()
    >>> if latest:
    ...     print(f"Latest: {latest.version_name}")
    ...     print(latest.pos_data)
    """
    if not self.versions:
        return None

    # Version names sort chronologically due to timestamp format
    latest_name = sorted(self.versions.keys())[-1]
    return self.versions[latest_name]

run_analysis

run_analysis(config_path: str | Path, rover_obs: str | Path | None = None, base_obs: str | Path | None = None, nav_file: str | Path | None = None, force: bool = False, analyze_rinex: bool = False, analyze_ppk: bool = False) -> PPKVersion | None

Execute RTKLIB PPK analysis with smart re-run logic.

Smart execution: - Runs only if config changed (hash comparison) or force=True - Generates auto-timestamp version name - Creates revision folder with .pos, .stat, .conf files - Executes RTKLIB rnx2rtkp subprocess - Parses results using POSAnalyzer and STATAnalyzer - Saves to HDF5

Parameters:

Name Type Description Default
config_path Union[str, Path]

Path to RTKLIB configuration file

required
rover_obs Union[str, Path]

Path to rover RINEX observation file. If None, first look for available files in the folder otherwise use convbin for conversion (default: None)

None
base_obs Union[str, Path]

Path to base RINEX observation file. If None, first look for available files in the folder otherwise use convbin for conversion (default: None)

None
nav_file Union[str, Path]

Path to navigation file. If None, first look for available files in the folder otherwise use convbin for conversion (default: None)

None
force bool

If True, force re-run even if config unchanged (default: False)

False
analyze_rinex bool

If True, analyze RINEX files (default: False)

False
analyze_ppk bool

If True, analyze PPK results (default: False)

False

Returns:

Type Description
Optional[PPKVersion]

PPKVersion object with results, or None if execution failed

Raises:

Type Description
FileNotFoundError

If config file doesn't exist

Examples:

>>> ppk = PPKAnalysis('/path/to/flight')
>>> # Smart execution - only runs if needed
>>> v1 = ppk.run_analysis(
...     'rtklib.conf',
...     'rover.obs',
...     'base.obs',
...     'nav.nav'
... )
>>> v2 = ppk.run_analysis(...)  # Skipped if config same
>>> # Force re-run
>>> v3 = ppk.run_analysis(..., force=True)
Source code in pils/analyze/ppk.py
def run_analysis(
    self,
    config_path: str | Path,
    rover_obs: str | Path | None = None,
    base_obs: str | Path | None = None,
    nav_file: str | Path | None = None,
    force: bool = False,
    analyze_rinex: bool = False,
    analyze_ppk: bool = False,
) -> PPKVersion | None:
    """
    Execute RTKLIB PPK analysis with smart re-run logic.

    Smart execution:
    - Runs only if config changed (hash comparison) or force=True
    - Generates auto-timestamp version name
    - Creates revision folder with .pos, .stat, .conf files
    - Executes RTKLIB rnx2rtkp subprocess
    - Parses results using POSAnalyzer and STATAnalyzer
    - Saves to HDF5

    Parameters
    ----------
    config_path : Union[str, Path]
        Path to RTKLIB configuration file
    rover_obs : Union[str, Path], optional
        Path to rover RINEX observation file. If None, first look for available
        files in the folder otherwise use convbin for conversion (default: None)
    base_obs : Union[str, Path], optional
        Path to base RINEX observation file. If None, first look for available
        files in the folder otherwise use convbin for conversion (default: None)
    nav_file : Union[str, Path], optional
        Path to navigation file. If None, first look for available
        files in the folder otherwise use convbin for conversion (default: None)
    force : bool, optional
        If True, force re-run even if config unchanged (default: False)
    analyze_rinex : bool, optional
        If True, analyze RINEX files (default: False)
    analyze_ppk : bool, optional
        If True, analyze PPK results (default: False)

    Returns
    -------
    Optional[PPKVersion]
        PPKVersion object with results, or None if execution failed

    Raises
    ------
    FileNotFoundError
        If config file doesn't exist

    Examples
    --------
    >>> ppk = PPKAnalysis('/path/to/flight')
    >>> # Smart execution - only runs if needed
    >>> v1 = ppk.run_analysis(
    ...     'rtklib.conf',
    ...     'rover.obs',
    ...     'base.obs',
    ...     'nav.nav'
    ... )
    >>> v2 = ppk.run_analysis(...)  # Skipped if config same
    >>> # Force re-run
    >>> v3 = ppk.run_analysis(..., force=True)
    """

    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Config file not found: {config_path}")

    # Check if should run
    if not force and not self._should_run_analysis(config_path):
        logger.info("Skipping analysis - config unchanged")
        latest = self.get_latest_version()
        if latest is not None:
            return latest

    # Generate version name and create folder
    version_name = self._generate_version_name()
    revision_path = self._create_revision_folder(version_name)

    # Initialize nav file variables
    rover_nav: Path | None = None
    base_nav: Path | None = None

    if rover_obs is not None:
        rover_obs = Path(rover_obs)
        # Infer rover nav file if exists
        potential_nav = rover_obs.with_suffix(".nav")
        if potential_nav.exists():
            rover_nav = potential_nav
    else:
        obs_path = self.ppk_dir / "rover.obs"
        nav_path = self.ppk_dir / "rover.nav"

        if obs_path.exists() and nav_path.exists():
            rover_obs = obs_path
            rover_nav = nav_path

        else:
            rover_path = self.flight_path / "aux" / "sensors"

            try:
                rover_ubx = list(rover_path.glob("*_GPS.bin"))[0]
            except IndexError:
                logger.info("Missing rover .bin file")
                return None

            cmd = [
                "convbin",
                "-od",
                "-os",
                "-oi",
                "-ot",
                "-ol",
                "-r",
                "ubx",
                "-o",
                str(obs_path),
                "-n",
                str(nav_path),
                str(rover_ubx),
            ]

            subprocess.run(cmd, check=True)

            if not obs_path.exists():
                logger.info("rover obs file not created")
                raise FileNotFoundError("Conversion failed: .obs file not created")
            if not obs_path.exists():
                logger.info("rover nav file not created")
                raise FileNotFoundError("Conversion failed: .nav file not created")

            rover_obs = obs_path
            rover_nav = nav_path

    start_rover, end_rover = self._get_rinex_bounds(rover_obs)

    if base_obs is not None:
        base_obs = Path(base_obs)
        # Infer base nav file if exists
        potential_nav = base_obs.with_suffix(".nav")
        if potential_nav.exists():
            base_nav = potential_nav
    else:
        obs_path = self.ppk_dir / "base.obs"
        nav_path = self.ppk_dir / "base.nav"

        if obs_path.exists() and nav_path.exists():
            base_obs = obs_path
            base_nav = nav_path
        else:
            day_path = self.flight_path.parent
            base_path = day_path / "base"

            # Check if rover bounds are valid
            if start_rover is None or end_rover is None:
                raise ValueError(
                    "Could not determine rover observation time bounds"
                )

            start = start_rover - timedelta(minutes=10)
            date_start, time_start = start.strftime("%Y/%m/%d %H:%M:%S").split(" ")
            end = end_rover + timedelta(minutes=10)
            date_end, time_end = end.strftime("%Y/%m/%d %H:%M:%S").split(" ")

            base_ubx = list(base_path.glob("*.[uU][bB][xX]"))[0]

            cmd = [
                "convbin",
                "-od",
                "-os",
                "-oi",
                "-ot",
                "-ol",
                "-r",
                "ubx",
                "-ts",
                date_start,
                time_start,
                "-te",
                date_end,
                time_end,
                "-o",
                str(obs_path),
                "-n",
                str(nav_path),
                str(base_ubx),
            ]

            subprocess.run(cmd, check=True)

            if not obs_path.exists():
                logger.info("rover obs file not created")
                raise FileNotFoundError("Conversion failed: .obs file not created")
            if not obs_path.exists():
                logger.info("rover nav file not created")
                raise FileNotFoundError("Conversion failed: .nav file not created")

            base_obs = obs_path
            base_nav = nav_path

    if nav_file is not None:
        nav_file = Path(nav_file)
    else:
        # Use the larger navigation file if both exist
        if base_nav and rover_nav:
            nav_file = max(base_nav, rover_nav, key=lambda p: p.stat().st_size)
        elif base_nav:
            nav_file = base_nav
        elif rover_nav:
            nav_file = rover_nav
        else:
            raise FileNotFoundError("No navigation file available (base or rover)")

    if not rover_obs.exists():
        raise FileNotFoundError(f"Rover observation file not found: {rover_obs}")
    if not base_obs.exists():
        raise FileNotFoundError(f"Base observation file not found: {base_obs}")
    if not nav_file.exists():
        raise FileNotFoundError(f"Navigation file not found: {nav_file}")

    if analyze_rinex:
        obs_files = [rover_obs, base_obs]
        nav_files = [rover_nav, base_nav]

        names = ["rover", "base"]

        for i, obs in enumerate(obs_files):
            report_md = self.ppk_dir / f"report_{names[i]}.md"

            if report_md.exists():
                continue
            else:
                report = RINEXReport(obs, nav_files[i])

                plot_folder = self.ppk_dir / f"rinex_plots_{names[i]}"

                report.generate(str(report_md), str(plot_folder.name))

    logger.info(f"Running PPK analysis: {version_name}")

    # Copy config to revision folder
    config_copy = revision_path / config_path.name
    shutil.copy2(config_path, config_copy)
    logger.info(f"Copied config to {config_copy}")

    # Define output file paths
    pos_file = revision_path / "solution.pos"
    stat_file = revision_path / "solution.pos.stat"

    cmd = [
        "rnx2rtkp",
        "-k",
        str(config_copy),
        "-o",
        str(pos_file),
        str(rover_obs),
        str(base_obs),
        str(nav_file),
    ]

    subprocess.run(cmd, check=True)

    if analyze_ppk:
        report_md = revision_path / "report_ppk.md"

        report = RTKLIBReport(pos_file, stat_file)

        plot_folder = revision_path / "plots"

        report.generate(str(report_md.parent), str(plot_folder.name))

    # Check if output files were created
    if not pos_file.exists():
        logger.error(f"Position file not created: {pos_file}")
        # Clean up revision folder on failure
        if revision_path.exists():
            shutil.rmtree(revision_path)
            logger.info(f"Cleaned up failed revision folder: {revision_path}")
        return None

    # Parse position file
    try:
        pos_analyzer = POSAnalyzer(str(pos_file))
        pos_data = pos_analyzer.parse()
        logger.info(f"Parsed position: {len(pos_data)} epochs")
    except Exception as e:
        logger.error(f"Failed to parse position file: {e}")
        # Clean up revision folder on failure
        if revision_path.exists():
            shutil.rmtree(revision_path)
            logger.info(f"Cleaned up failed revision folder: {revision_path}")
        return None

    # Parse statistics file (optional)
    if not stat_file.exists():
        logger.warning(f"Statistics file not created: {stat_file}")
        stat_data = None
    else:
        try:
            stat_analyzer = STATAnalyzer(str(stat_file))
            stat_data = stat_analyzer.parse()
            logger.info(f"Parsed statistics: {len(stat_data)} records")
        except Exception as e:
            logger.error(f"Failed to parse statistics file: {e}")
            stat_data = None

    # Create metadata with config hash and parsed parameters
    config_hash = self._hash_config(config_path)
    config_params = self._parse_config_params(config_path)
    metadata = {
        "config_hash": config_hash,
        "timestamp": datetime.now().isoformat(),
        "config_params": config_params,
        "rover_obs": str(rover_obs),
        "base_obs": str(base_obs),
        "nav_file": str(nav_file),
        "pos_file": str(pos_file),
        "stat_file": str(stat_file) if stat_data is not None else None,
    }

    # Create version object (handle None stat_data)
    if stat_data is None:
        stat_data = pl.DataFrame()

    version = PPKVersion(
        version_name=version_name,
        pos_data=pos_data,
        stat_data=stat_data,
        metadata=metadata,
        revision_path=revision_path,
    )

    # Store in versions dict
    self.versions[version_name] = version

    # Save to HDF5 automatically
    self._save_version_to_hdf5(version)

    logger.info(f"PPK analysis complete: {version_name}")
    return version

from_hdf5 classmethod

from_hdf5(flight: Flight) -> PPKAnalysis

Load existing PPKAnalysis from HDF5 file.

Creates a PPKAnalysis instance and loads all stored versions from ppk_solution.h5. If no HDF5 file exists, returns empty PPKAnalysis.

Parameters:

Name Type Description Default
flight Flight

Flight object with valid flight_path attribute

required

Returns:

Type Description
PPKAnalysis

PPKAnalysis instance with loaded versions

Raises:

Type Description
TypeError

If flight is not a Flight object

ValueError

If flight.flight_path is None or not an existing directory

Examples:

>>> from pils.flight import Flight
>>> flight_info = {
...     "drone_data_folder_path": "/path/to/flight/drone",
...     "aux_data_folder_path": "/path/to/flight/aux"
... }
>>> flight = Flight(flight_info)
>>> ppk = PPKAnalysis.from_hdf5(flight)
>>> print(f"Loaded {len(ppk.versions)} versions")
>>> for version_name in ppk.list_versions():
...     print(f"  - {version_name}")
Source code in pils/analyze/ppk.py
@classmethod
def from_hdf5(cls, flight: Flight) -> PPKAnalysis:
    """
    Load existing PPKAnalysis from HDF5 file.

    Creates a PPKAnalysis instance and loads all stored versions from
    ppk_solution.h5. If no HDF5 file exists, returns empty PPKAnalysis.

    Parameters
    ----------
    flight : Flight
        Flight object with valid flight_path attribute

    Returns
    -------
    PPKAnalysis
        PPKAnalysis instance with loaded versions

    Raises
    ------
    TypeError
        If flight is not a Flight object
    ValueError
        If flight.flight_path is None or not an existing directory

    Examples
    --------
    >>> from pils.flight import Flight
    >>> flight_info = {
    ...     "drone_data_folder_path": "/path/to/flight/drone",
    ...     "aux_data_folder_path": "/path/to/flight/aux"
    ... }
    >>> flight = Flight(flight_info)
    >>> ppk = PPKAnalysis.from_hdf5(flight)
    >>> print(f"Loaded {len(ppk.versions)} versions")
    >>> for version_name in ppk.list_versions():
    ...     print(f"  - {version_name}")
    """
    import h5py

    ppk = cls(flight)

    # Check if HDF5 file exists
    if not ppk.hdf5_path.exists():
        logger.info("No HDF5 file found, returning empty PPKAnalysis")
        return ppk

    # Load all version groups
    with h5py.File(ppk.hdf5_path, "r") as f:
        for version_name in f.keys():
            try:
                ppk._load_version_from_hdf5(version_name)
                logger.info(f"Loaded version: {version_name}")
            except Exception as e:
                logger.warning(f"Failed to load version {version_name}: {e}")

    logger.info(f"Loaded {len(ppk.versions)} versions from HDF5")
    return ppk

list_versions

list_versions() -> list[str]

Return list of all version names in chronological order.

Version names use timestamp format (rev_YYYYMMDD_HHMMSS) which ensures alphabetical sorting equals chronological sorting.

Returns:

Type Description
List[str]

Sorted list of version names

Examples:

>>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
>>> versions = ppk.list_versions()
>>> print(versions)
['rev_20260204_140000', 'rev_20260204_150000', 'rev_20260204_160000']
Source code in pils/analyze/ppk.py
def list_versions(self) -> list[str]:
    """
    Return list of all version names in chronological order.

    Version names use timestamp format (rev_YYYYMMDD_HHMMSS) which ensures
    alphabetical sorting equals chronological sorting.

    Returns
    -------
    List[str]
        Sorted list of version names

    Examples
    --------
    >>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
    >>> versions = ppk.list_versions()
    >>> print(versions)
    ['rev_20260204_140000', 'rev_20260204_150000', 'rev_20260204_160000']
    """
    return sorted(list(self.versions.keys()))

get_version

get_version(version_name: str) -> PPKVersion | None

Get specific version by name.

Parameters:

Name Type Description Default
version_name str

Version identifier (e.g., 'rev_20260204_143022')

required

Returns:

Type Description
Optional[PPKVersion]

PPKVersion if found, None otherwise

Examples:

>>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
>>> version = ppk.get_version('rev_20260204_143022')
>>> if version:
...     print(version.pos_data)
Source code in pils/analyze/ppk.py
def get_version(self, version_name: str) -> PPKVersion | None:
    """
    Get specific version by name.

    Parameters
    ----------
    version_name : str
        Version identifier (e.g., 'rev_20260204_143022')

    Returns
    -------
    Optional[PPKVersion]
        PPKVersion if found, None otherwise

    Examples
    --------
    >>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
    >>> version = ppk.get_version('rev_20260204_143022')
    >>> if version:
    ...     print(version.pos_data)
    """
    return self.versions.get(version_name)

delete_version

delete_version(version_name: str) -> None

Delete a version from HDF5 and filesystem.

Removes version from: 1. self.versions dict 2. HDF5 file (deletes group) 3. Filesystem (deletes revision folder)

Parameters:

Name Type Description Default
version_name str

Version identifier to delete

required

Examples:

>>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
>>> ppk.delete_version('rev_20260204_140000')
>>> print(ppk.list_versions())  # Version gone
Source code in pils/analyze/ppk.py
def delete_version(self, version_name: str) -> None:
    """
    Delete a version from HDF5 and filesystem.

    Removes version from:
    1. self.versions dict
    2. HDF5 file (deletes group)
    3. Filesystem (deletes revision folder)

    Parameters
    ----------
    version_name : str
        Version identifier to delete

    Examples
    --------
    >>> ppk = PPKAnalysis.from_hdf5('/path/to/flight')
    >>> ppk.delete_version('rev_20260204_140000')
    >>> print(ppk.list_versions())  # Version gone
    """
    import shutil

    import h5py

    # Remove from versions dict
    if version_name in self.versions:
        version = self.versions.pop(version_name)

        # Delete revision folder from filesystem
        if version.revision_path.exists():
            shutil.rmtree(version.revision_path)
            logger.info(f"Deleted revision folder: {version.revision_path}")

    # Delete from HDF5 file
    if self.hdf5_path.exists():
        with h5py.File(self.hdf5_path, "a") as f:
            if version_name in f:
                del f[version_name]
                logger.info(f"Deleted version {version_name} from HDF5")

    logger.info(f"Version {version_name} deleted")

PPKVersion dataclass

PPKVersion(version_name: str, pos_data: DataFrame, stat_data: DataFrame, metadata: dict[str, Any], revision_path: Path)

Container for a single PPK analysis revision.

Each revision represents one execution of RTKLIB with specific configuration parameters, storing position solutions, statistics, and metadata.

Attributes:

Name Type Description
version_name str

Auto-timestamp version identifier (rev_YYYYMMDD_HHMMSS)

pos_data DataFrame

Position solution DataFrame from .pos file

stat_data DataFrame

Processing statistics DataFrame from .pos.stat file

metadata Dict[str, Any]

Config hash, parsed parameters, and execution metadata

revision_path Path

Path to revision folder containing .pos, .stat, .conf files

Examples:

>>> version = PPKVersion(
...     version_name='rev_20260204_143022',
...     pos_data=pos_df,
...     stat_data=stat_df,
...     metadata={'config_hash': 'abc123'},
...     revision_path=Path('/flight/proc/ppk/rev_20260204_143022')
... )

RINEX Analysis

RINEXAnalyzer

RINEXAnalyzer(obspath: Path, navpath: Path | None = None)

High-performance RINEX quality analyzer using Polars.

Analyzes GNSS RINEX observation files for data quality assessment, including SNR analysis, multipath detection, cycle slip detection, and satellite geometry metrics.

Attributes:

Name Type Description
obspath str

Path to RINEX file

filename str

Name of RINEX file

df DataFrame

Parsed observations data

header_info dict

Header metadata from RINEX file

obs_types dict

Observation types by constellation

epochs list

List of observation epochs

azel_df DataFrame

Azimuth-elevation data for satellites

glo_slots dict

GLONASS frequency slot assignments

Examples:

>>> analyzer = RINEXAnalyzer('station.obs')
>>> analyzer.parse()
>>> quality = analyzer.assess_data_quality()
>>> print(f"Quality score: {quality['score']}")

Initialize RINEX analyzer.

Args: obspath: Path to RINEX observation file

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def __init__(self, obspath: Path, navpath: Path | None = None) -> None:
    """Initialize RINEX analyzer.

    Args:
        obspath: Path to RINEX observation file
    """
    self.obspath = obspath
    self.obsname = Path(obspath).name

    if navpath is not None:
        self.navpath = navpath
        self.navname = Path(navpath).name

    self.df_obs = pl.DataFrame()
    self.header_info = {}
    self.obs_types = {}
    self.epochs = []
    self.azel_df = pl.DataFrame()
    self.glo_slots = {}

parse_obs_file

parse_obs_file(snr_only: bool = False, sample_rate: int = 1)

Parse RINEX file into Polars DataFrame.

Performs full-fidelity parsing of RINEX observation file, extracting all observation types and metadata.

Args: snr_only: If True, only parse SNR observations sample_rate: Epoch sampling rate (1 = all epochs)

Returns: Polars DataFrame with parsed observations

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> df = analyzer.parse(sample_rate=30) # Every 30s >>> print(df.shape)

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def parse_obs_file(self, snr_only: bool = False, sample_rate: int = 1):
    """Parse RINEX file into Polars DataFrame.

    Performs full-fidelity parsing of RINEX observation file,
    extracting all observation types and metadata.

    Args:
        snr_only: If True, only parse SNR observations
        sample_rate: Epoch sampling rate (1 = all epochs)

    Returns:
        Polars DataFrame with parsed observations

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> df = analyzer.parse(sample_rate=30)  # Every 30s
        >>> print(df.shape)
    """
    logger.info(f"Parsing RINEX file: {self.obsname}")
    in_header = True
    records = []
    epoch_counter = 0
    current_epoch = None  # Initialize before loop to avoid unbound errors

    with open(self.obspath) as f:
        for line in f:
            if in_header:
                if "END OF HEADER" in line:
                    in_header = False
                elif "SYS / # / OBS TYPES" in line:
                    sys = line[0]
                    t = line[7:60].split()
                    if sys not in self.obs_types:
                        self.obs_types[sys] = t
                    else:
                        self.obs_types[sys].extend(t)
                elif "GLONASS SLOT / FRQ #" in line:
                    content = line[:60]
                    # First line may have a count in the first 3 chars
                    start_idx = 4 if content[:3].strip().isdigit() else 0
                    parts = content[start_idx:].split()
                    for j in range(0, len(parts), 2):
                        if j + 1 < len(parts):
                            self.glo_slots[parts[j]] = int(parts[j + 1])
                elif "APPROX POSITION XYZ" in line:
                    parts = line[:60].split()
                    if len(parts) >= 3:
                        self.header_info["position"] = [
                            float(parts[0]),
                            float(parts[1]),
                            float(parts[2]),
                        ]
                continue

            if line.startswith(">"):
                try:
                    p = line.split()
                    dt = datetime(
                        int(p[1]),
                        int(p[2]),
                        int(p[3]),
                        int(p[4]),
                        int(p[5]),
                        int(float(p[6])),
                    )
                    dt += timedelta(seconds=float(p[6]) % 1)
                    epoch_counter += 1
                    current_epoch = dt if epoch_counter % sample_rate == 0 else None
                    if current_epoch:
                        self.epochs.append(dt)
                except (ValueError, IndexError):
                    current_epoch = None
                continue

            if current_epoch is None:
                continue
            if len(line) < 4:
                continue
            sat_id = line[0:3].strip()
            const = sat_id[0]
            if const not in self.obs_types:
                continue

            obs_line = line[3:]
            obs_list = self.obs_types[const]
            for i, obs_type in enumerate(obs_list):
                if snr_only and not obs_type.startswith("S"):
                    continue
                start = i * 16
                if start >= len(obs_line):
                    break
                val_str = obs_line[start : start + 14].strip()
                if not val_str:
                    continue

                # LLI is the character at index 14 of the 16-char block
                lli_str = obs_line[start + 14 : start + 15].strip()
                lli = int(lli_str) if lli_str else 0

                try:
                    if current_epoch is not None:
                        records.append(
                            {
                                "time": current_epoch,
                                "satellite": sat_id,
                                "constellation": const,
                                "frequency": get_frequency_band(const, obs_type[1]),
                                "obs_type": obs_type[0],
                                "value": float(val_str),
                                "lli": lli,
                            }
                        )
                except ValueError:
                    pass

    self.df_obs = pl.DataFrame(records)
    logger.info(
        f"Parsed {len(self.df_obs)} observations across {len(self.epochs)} epochs"
    )

parse_nav_file

parse_nav_file()

Robust RINEX 3 NAV parser for GPS, Galileo, BeiDou, and GLONASS.

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def parse_nav_file(self):
    """Robust RINEX 3 NAV parser for GPS, Galileo, BeiDou, and GLONASS."""
    self.nav_data = {}
    if not self.navpath.exists():
        logger.warning(f"NAV file not found: {self.navpath}")
        return

    logger.info(f"Parsing NAV data from {self.navpath.name}")
    with open(self.navpath) as f:
        header_end = False
        for line in f:
            if "END OF HEADER" in line:
                header_end = True
                break

        if not header_end:
            return

        lines = f.readlines()
        i = 0
        while i < len(lines):
            line = lines[i]
            if not line.strip():
                i += 1
                continue

            sat_id = line[0:3].strip()
            if not sat_id or len(sat_id) < 2:
                i += 1
                continue
            const = sat_id[0]

            # Determine record length based on constellation (RINEX 3 standard)
            # GPS/GAL/BDS usually 8-line records (7 data lines)
            # GLO usually 4-line records (3 data lines)
            n_data_lines = 3 if const == "R" else 7

            try:
                # Collect all lines for this record
                record_lines = lines[i : i + 1 + n_data_lines]
                data_str = ""
                # Header line (line 0) has epoch and first 3 values
                data_str += record_lines[0][23:].replace("D", "E").replace("d", "e")
                # Data lines (1 to n)
                for dl in record_lines[1:]:
                    data_str += dl.replace("D", "E").replace("d", "e")

                vals = [float(v) for v in data_str.split()]

                # Parse epoch from header line
                # Format: YYYY MM DD HH mm ss
                # Note: RINEX 3 NAV header lines start with SatID, then YYYY...
                ep_str = record_lines[0][3:23]
                parts = ep_str.split()
                epoch = datetime(
                    int(parts[0]),
                    int(parts[1]),
                    int(parts[2]),
                    int(parts[3]),
                    int(parts[4]),
                    int(float(parts[5])),
                )

                if const in "GEC":  # GPS, Galileo, BeiDou
                    if len(vals) < 21:
                        i += 1 + n_data_lines
                        continue
                    eph = {
                        "epoch": epoch,
                        "af0": vals[0],
                        "af1": vals[1],
                        "af2": vals[2],
                        "iode": vals[3],
                        "crs": vals[4],
                        "delta_n": vals[5],
                        "M0": vals[6],
                        "cuc": vals[7],
                        "e": vals[8],
                        "cus": vals[9],
                        "sqrt_a": vals[10],
                        "toe": vals[11],
                        "cic": vals[12],
                        "omega0": vals[13],
                        "cis": vals[14],
                        "i0": vals[15],
                        "crc": vals[16],
                        "omega": vals[17],
                        "omega_dot": vals[18],
                        "i_dot": vals[19],
                    }
                elif const == "R":  # GLONASS
                    if len(vals) < 12:
                        i += 1 + n_data_lines
                        continue
                    eph = {
                        "epoch": epoch,
                        "tau_n": vals[0],
                        "gamma_n": vals[1],
                        "tk": vals[2],
                        "x": vals[3] * 1000.0,
                        "vx": vals[4] * 1000.0,
                        "ax": vals[5] * 1000.0,
                        "health": vals[6],
                        "y": vals[7] * 1000.0,
                        "vy": vals[8] * 1000.0,
                        "ay": vals[9] * 1000.0,
                        "freq_num": vals[10],
                        "z": vals[11] * 1000.0,
                        "vz": vals[12] * 1000.0,
                        "az": vals[13] * 1000.0,
                        "age": vals[14],
                    }
                else:
                    i += 1 + n_data_lines
                    continue

                if sat_id not in self.nav_data:
                    self.nav_data[sat_id] = []
                self.nav_data[sat_id].append(eph)

            except Exception:
                pass

            i += 1 + n_data_lines

compute_satellite_azel

compute_satellite_azel()

Propagates satellite positions for precise Az/El calculation.

Uses broadcast ephemeris to calculate satellite positions and compute azimuth/elevation angles from receiver position.

Examples: >>> analyzer = RINEXAnalyzer('file.obs', navpath='file.nav') >>> analyzer.parse_obs_file() >>> analyzer.parse_nav_file() >>> analyzer.compute_satellite_azel() >>> print(analyzer.azel_df.head())

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def compute_satellite_azel(self):
    """Propagates satellite positions for precise Az/El calculation.

    Uses broadcast ephemeris to calculate satellite positions and
    compute azimuth/elevation angles from receiver position.

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs', navpath='file.nav')
        >>> analyzer.parse_obs_file()
        >>> analyzer.parse_nav_file()
        >>> analyzer.compute_satellite_azel()
        >>> print(analyzer.azel_df.head())
    """
    if not hasattr(self, "nav_data") or not self.nav_data:
        logger.warning("No NAV data loaded. Falling back to mock geometry")
        return self._mock_azel()

    logger.info("Computing precise Az/El from NAV ephemeris")
    receiver_pos = self.header_info.get("position")
    if not receiver_pos:
        logger.warning("Receiver position unknown. Mocking Az/El")
        return self._mock_azel()

    # Constants
    GM = 3.986005e14
    OMEGA_E = 7.2921151467e-5

    azel_list = []
    for sat in self.df_obs["satellite"].unique():
        if sat not in self.nav_data:
            continue

        # Sort ephemeris by epoch
        eph_list = sorted(self.nav_data[sat], key=lambda x: x["epoch"])

        for t in self.epochs:
            # Find closest ephemeris (within 4 hours)
            closest = min(
                eph_list, key=lambda e: abs((e["epoch"] - t).total_seconds())
            )
            dt = (t - closest["epoch"]).total_seconds()
            if abs(dt) > 14400:
                continue

            try:
                if sat[0] in "GEC":
                    # Keplerian Propagation
                    e = closest["e"]
                    a = closest["sqrt_a"] ** 2
                    n = np.sqrt(GM / a**3) + closest["delta_n"]
                    M = closest["M0"] + n * dt

                    E = M
                    for _ in range(10):
                        E = M + e * np.sin(E)

                    v = 2 * np.arctan2(
                        np.sqrt(1 + e) * np.sin(E / 2),
                        np.sqrt(1 - e) * np.cos(E / 2),
                    )
                    phi = v + closest["omega"]

                    du = closest["cus"] * np.sin(2 * phi) + closest["cuc"] * np.cos(
                        2 * phi
                    )
                    dr = closest["crs"] * np.sin(2 * phi) + closest["crc"] * np.cos(
                        2 * phi
                    )
                    di = closest["cis"] * np.sin(2 * phi) + closest["cic"] * np.cos(
                        2 * phi
                    )

                    u = phi + du
                    r = a * (1 - e * np.cos(E)) + dr
                    i = closest["i0"] + di + closest["i_dot"] * dt

                    x_op, y_op = r * np.cos(u), r * np.sin(u)
                    omega = (
                        closest["omega0"]
                        + (closest["omega_dot"] - OMEGA_E) * dt
                        - OMEGA_E * closest["toe"]
                    )

                    x, y, z = (
                        x_op * np.cos(omega) - y_op * np.cos(i) * np.sin(omega),
                        x_op * np.sin(omega) + y_op * np.cos(i) * np.cos(omega),
                        y_op * np.sin(i),
                    )
                else:  # GLONASS Simplified Linear
                    x = (
                        closest["x"]
                        + closest["vx"] * dt
                        + 0.5 * closest["ax"] * dt**2
                    )
                    y = (
                        closest["y"]
                        + closest["vy"] * dt
                        + 0.5 * closest["ay"] * dt**2
                    )
                    z = (
                        closest["z"]
                        + closest["vz"] * dt
                        + 0.5 * closest["az"] * dt**2
                    )
                    # Earth rotation correction for GLO
                    angle = OMEGA_E * dt
                    x_rot = x * np.cos(angle) + y * np.sin(angle)
                    y_rot = -x * np.sin(angle) + y * np.cos(angle)
                    x, y = x_rot, y_rot

                # ENU Conversion
                dx, dy, dz = (
                    x - receiver_pos[0],
                    y - receiver_pos[1],
                    z - receiver_pos[2],
                )
                p = np.sqrt(receiver_pos[0] ** 2 + receiver_pos[1] ** 2)
                lon = np.arctan2(receiver_pos[1], receiver_pos[0])
                lat = np.arctan2(receiver_pos[2], p * (1 - 0.00669437999))

                e_enu = -np.sin(lon) * dx + np.cos(lon) * dy
                n_enu = (
                    -np.sin(lat) * np.cos(lon) * dx
                    - np.sin(lat) * np.sin(lon) * dy
                    + np.cos(lat) * dz
                )
                u_val = (
                    np.cos(lat) * np.cos(lon) * dx
                    + np.cos(lat) * np.sin(lon) * dy
                    + np.sin(lat) * dz
                )

                az = np.rad2deg(np.arctan2(e_enu, n_enu)) % 360
                el = np.rad2deg(np.arctan2(u_val, np.sqrt(e_enu**2 + n_enu**2)))
                azel_list.append(
                    {"time": t, "satellite": sat, "azimuth": az, "elevation": el}
                )
            except Exception:
                pass

    self.azel_df = pl.DataFrame(azel_list)

get_snr

get_snr()

Extract SNR observations from parsed data.

Returns: DataFrame containing only SNR ('S') observations

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> snr_df = analyzer.get_snr() >>> print(f"SNR observations: {len(snr_df)}")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_snr(self):
    """Extract SNR observations from parsed data.

    Returns:
        DataFrame containing only SNR ('S') observations

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> snr_df = analyzer.get_snr()
        >>> print(f"SNR observations: {len(snr_df)}")
    """
    return self.df_obs.filter(pl.col("obs_type") == "S")

get_snr_statistics

get_snr_statistics()

Calculate SNR statistics per satellite and frequency.

Returns: DataFrame with mean, std, and count grouped by satellite and frequency

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> stats = analyzer.get_snr_statistics() >>> gps_l1 = stats.filter(pl.col('frequency') == 'L1') >>> print(gps_l1)

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_snr_statistics(self):
    """Calculate SNR statistics per satellite and frequency.

    Returns:
        DataFrame with mean, std, and count grouped by satellite and frequency

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> stats = analyzer.get_snr_statistics()
        >>> gps_l1 = stats.filter(pl.col('frequency') == 'L1')
        >>> print(gps_l1)
    """
    return (
        self.get_snr()
        .group_by(["satellite", "frequency"])
        .agg(
            [
                pl.col("value").mean().alias("mean"),
                pl.col("value").std().alias("std"),
                pl.col("value").count().alias("count"),
            ]
        )
    )

get_global_frequency_summary

get_global_frequency_summary() -> DataFrame

Compute global statistics summary for all frequency bands.

Calculates mean/std SNR, multipath RMS, satellite count, and observation count for each frequency band.

Returns: DataFrame with columns: frequency, mean, std, mean_MP_RMS, n_satellites, count

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse() >>> summary = analyzer.get_global_frequency_summary() >>> print(summary.filter(pl.col('frequency') == 'L1'))

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_global_frequency_summary(self) -> pl.DataFrame:
    """Compute global statistics summary for all frequency bands.

    Calculates mean/std SNR, multipath RMS, satellite count,
    and observation count for each frequency band.

    Returns:
        DataFrame with columns: frequency, mean, std, mean_MP_RMS,
        n_satellites, count

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse()
        >>> summary = analyzer.get_global_frequency_summary()
        >>> print(summary.filter(pl.col('frequency') == 'L1'))
    """
    snr_summary = (
        self.get_snr()
        .group_by(["constellation", "frequency"])
        .agg(
            [
                pl.col("value").mean().alias("mean"),
                pl.col("value").std().alias("std"),
                pl.col("satellite").n_unique().alias("n_satellites"),
                pl.col("value").count().alias("count"),
            ]
        )
    )
    mp_rms = self.get_multipath_rms()
    if not mp_rms.is_empty():
        mp_sum = mp_rms.group_by(["constellation", "frequency"]).agg(
            pl.col("MP_RMS").mean().alias("mean_MP_RMS")
        )
        return snr_summary.join(
            mp_sum, on=["constellation", "frequency"], how="left"
        ).sort(["constellation", "frequency"])
    return snr_summary.with_columns(pl.lit(None).alias("mean_MP_RMS")).sort(
        ["constellation", "frequency"]
    )

estimate_multipath

estimate_multipath()

Estimate multipath error using code-phase combinations.

Implements RTKLIB multipath algorithm using dual-frequency data to isolate multipath effects from other errors.

Returns: DataFrame with multipath estimates (MP) for each observation

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> mp_df = analyzer.estimate_multipath() >>> high_mp = mp_df.filter(pl.col('MP').abs() > 1.0) >>> print(f"High multipath: {len(high_mp)} observations")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def estimate_multipath(self):
    """Estimate multipath error using code-phase combinations.

    Implements RTKLIB multipath algorithm using dual-frequency
    data to isolate multipath effects from other errors.

    Returns:
        DataFrame with multipath estimates (MP) for each observation

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> mp_df = analyzer.estimate_multipath()
        >>> high_mp = mp_df.filter(pl.col('MP').abs() > 1.0)
        >>> print(f"High multipath: {len(high_mp)} observations")
    """
    """
    Implements the RTKLIB multipath algorithm (Plot::updateMp).
    1. Calculate reference ionosphere I from available dual-frequency data.
    2. Compute raw MP for all frequencies.
    3. Remove mean bias per arc (separated by jumps or cycle slips).
    """
    if self.df_obs.is_empty():
        return pl.DataFrame()

    # 1. Prepare frequency information
    # Get all phase (L) and code (P) obs
    obs = self.df_obs.filter(pl.col("obs_type").is_in(["L", "C", "P"]))
    if obs.is_empty():
        return pl.DataFrame()

    # Pivot obs_type to columns. Ensure unique rows first.
    pivoted = obs.unique(
        subset=["time", "satellite", "frequency", "obs_type"]
    ).pivot(
        on="obs_type",
        index=["time", "satellite", "constellation", "frequency"],
        values="value",
    )

    # Merge Code types (P and C)
    code_cols = [c for c in ["P", "C"] if c in pivoted.columns]
    if not code_cols:
        return pl.DataFrame()

    if len(code_cols) == 2:
        pivoted = pivoted.with_columns(
            pl.col("P").fill_null(pl.col("C")).alias("P_val")
        )
    else:
        pivoted = pivoted.with_columns(pl.col(code_cols[0]).alias("P_val"))

    if "L" not in pivoted.columns:
        return pl.DataFrame()

    # Filter rows with both P and L
    data = pivoted.filter(pl.col("P_val").is_not_null() & pl.col("L").is_not_null())
    if data.is_empty():
        return pl.DataFrame()

    # Join with GNSS frequencies to get Hz values
    freq_rows = []
    unique_sats = data.select(["satellite", "constellation"]).unique()

    for row in unique_sats.iter_rows(named=True):
        sat = row["satellite"]
        const = row["constellation"]
        if const not in GNSS_FREQUENCIES:
            continue

        for band, f_mhz in GNSS_FREQUENCIES[const].items():
            f_hz = f_mhz * 1e6
            # GLONASS FDMA adjustment
            if const == "R":
                slot = self.glo_slots.get(sat)
                if slot is not None:
                    if band == "G1":
                        f_hz = (1602.0 + slot * 0.5625) * 1e6
                    elif band == "G2":
                        f_hz = (1246.0 + slot * 0.4375) * 1e6

            freq_rows.append(
                {
                    "satellite": sat,
                    "constellation": const,
                    "frequency": band,
                    "f_hz": f_hz,
                }
            )

    freq_df = pl.DataFrame(freq_rows)

    data = data.join(freq_df, on=["satellite", "constellation", "frequency"])

    # Phase L in meters
    data = data.with_columns((pl.col("L") * (C / pl.col("f_hz"))).alias("L_m"))

    # 2. Reference Ionosphere I per (time, satellite)
    # Group by (time, sat) and pick first two available frequencies for the reference ionosphere
    dual = (
        data.sort(
            ["time", "satellite", "f_hz"], descending=[False, False, True]
        )  # Sort to pick primary bands
        .group_by(["time", "satellite"])
        .agg(
            [
                pl.col("f_hz").head(2).alias("f_pair"),
                pl.col("L_m").head(2).alias("L_pair"),
            ]
        )
        .filter(pl.col("f_pair").list.len() >= 2)
    )

    if dual.is_empty():
        return pl.DataFrame()

    dual = dual.with_columns(
        [
            pl.col("f_pair").list.get(0).alias("f1"),
            pl.col("f_pair").list.get(1).alias("f2"),
            pl.col("L_pair").list.get(0).alias("L1_m"),
            pl.col("L_pair").list.get(1).alias("L2_m"),
        ]
    )

    # Reference I at f1: I1 = (L1 - L2) / ((f1/f2)**2 - 1)
    dual = dual.with_columns(
        (
            (pl.col("L1_m") - pl.col("L2_m"))
            / ((pl.col("f1") / pl.col("f2")) ** 2 - 1)
        ).alias("I1")
    )

    # Join back to calculate raw MP for ALL frequencies in that epoch
    res = data.join(
        dual.select(["time", "satellite", "f1", "I1"]), on=["time", "satellite"]
    )

    # Raw MP_j = P_j - L_j - 2 * (f1/f_j)^2 * I1
    res = res.with_columns(
        (
            pl.col("P_val")
            - pl.col("L_m")
            - 2 * (pl.col("f1") / pl.col("f_hz")) ** 2 * pl.col("I1")
        ).alias("MP_raw")
    )

    # 3. Bias Removal per Arc (Sequence of continuous observations)
    res = res.sort(["satellite", "frequency", "time"])

    # Break arcs on: Time gap (> 60s) OR Raw MP jump (> 5.0m)
    res = res.with_columns(
        [
            pl.col("time")
            .diff()
            .dt.total_seconds()
            .over(["satellite", "frequency"])
            .fill_null(0)
            .alias("dt"),
            pl.col("MP_raw")
            .diff()
            .abs()
            .over(["satellite", "frequency"])
            .fill_null(0)
            .alias("jump"),
        ]
    )

    res = res.with_columns(
        ((pl.col("dt") > 60) | (pl.col("jump") > 5.0)).alias("is_break")
    )

    res = res.with_columns(
        pl.col("is_break")
        .cast(pl.Int32)
        .cum_sum()
        .over(["satellite", "frequency"])
        .alias("arc_id")
    )

    # Final MP: Subtract mean per arc to remove ambiguity bias
    res = res.with_columns(
        (
            pl.col("MP_raw")
            - pl.col("MP_raw").mean().over(["satellite", "frequency", "arc_id"])
        ).alias("MP")
    )

    return res.select(["time", "satellite", "constellation", "frequency", "MP"])

get_multipath_rms

get_multipath_rms()

Calculate RMS multipath per satellite and frequency.

Returns: DataFrame with MP_RMS values grouped by satellite, constellation, frequency

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> mp_rms = analyzer.get_multipath_rms() >>> print(f"Average L1 multipath: {mp_rms.filter(pl.col('frequency')=='L1')['MP_RMS'].mean():.2f}m")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_multipath_rms(self):
    """Calculate RMS multipath per satellite and frequency.

    Returns:
        DataFrame with MP_RMS values grouped by satellite, constellation, frequency

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> mp_rms = analyzer.get_multipath_rms()
        >>> print(f"Average L1 multipath: {mp_rms.filter(pl.col('frequency')=='L1')['MP_RMS'].mean():.2f}m")
    """
    mp = self.estimate_multipath()
    if mp.is_empty():
        return pl.DataFrame()
    return mp.group_by(["satellite", "constellation", "frequency"]).agg(
        (pl.col("MP") ** 2).mean().sqrt().alias("MP_RMS")
    )

detect_cycle_slips

detect_cycle_slips(threshold_gf=0.08, threshold_mw=2.5)

Detect cycle slips using dual-frequency combinations.

Uses geometry-free (GF) and Melbourne-Wübbena (MW) combinations to identify carrier phase cycle slips.

Args: threshold_gf: Geometry-free jump threshold in meters (default: 0.08) threshold_mw: Melbourne-Wübbena jump threshold in cycles (default: 2.5)

Returns: DataFrame with detected slips including time, satellite, and type

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> slips = analyzer.detect_cycle_slips(threshold_gf=0.1, threshold_mw=3.0) >>> print(f"Total cycle slips detected: {len(slips)}") >>> gps_slips = slips.filter(pl.col('satellite').str.starts_with('G'))

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def detect_cycle_slips(self, threshold_gf=0.08, threshold_mw=2.5):
    """Detect cycle slips using dual-frequency combinations.

    Uses geometry-free (GF) and Melbourne-Wübbena (MW) combinations
    to identify carrier phase cycle slips.

    Args:
        threshold_gf: Geometry-free jump threshold in meters (default: 0.08)
        threshold_mw: Melbourne-Wübbena jump threshold in cycles (default: 2.5)

    Returns:
        DataFrame with detected slips including time, satellite, and type

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> slips = analyzer.detect_cycle_slips(threshold_gf=0.1, threshold_mw=3.0)
        >>> print(f"Total cycle slips detected: {len(slips)}")
        >>> gps_slips = slips.filter(pl.col('satellite').str.starts_with('G'))
    """
    """Restores dual-combination (GF + MW) Slip Detection."""
    slips = []
    for sat in self.df_obs["satellite"].unique().to_list():
        const = sat[0]
        b1, b2 = get_dual_freq_bands(const)
        if not b2 or const not in GNSS_FREQUENCIES:
            continue

        f1 = GNSS_FREQUENCIES[const][b1]
        f2 = GNSS_FREQUENCIES[const][b2]
        l1_wl = C / (f1 * 1e6)
        l2_wl = C / (f2 * 1e6)
        wl_wl = C / ((f1 - f2) * 1e6)

        sub = self.df_obs.filter(pl.col("satellite") == sat)

        def get_t(data, kind, band):
            return data.filter(
                (pl.col("obs_type") == kind) & (pl.col("frequency") == band)
            ).select(["time", "value"])

        c1, c2, l1, l2 = (
            get_t(sub, "C", b1),
            get_t(sub, "C", b2),
            get_t(sub, "L", b1),
            get_t(sub, "L", b2),
        )
        if any(d.is_empty() for d in [c1, c2, l1, l2]):
            continue

        comb = (
            l1.rename({"value": "L1_cyc"})
            .join(l2.rename({"value": "L2_cyc"}), on="time")
            .join(c1.rename({"value": "C1"}), on="time")
            .join(c2.rename({"value": "C2"}), on="time")
        )

        # GF in meters
        comb = comb.with_columns(
            (pl.col("L1_cyc") * l1_wl - pl.col("L2_cyc") * l2_wl).alias("GF")
        )
        # MW in cycles
        comb = comb.with_columns(
            (
                (
                    (f1 * pl.col("L1_cyc") * l1_wl - f2 * pl.col("L2_cyc") * l2_wl)
                    / (f1 - f2)
                    - (f1 * pl.col("C1") + f2 * pl.col("C2")) / (f1 + f2)
                )
                / wl_wl
            ).alias("MW")
        )

        # Jump detection
        comb = comb.sort("time").with_columns(
            [
                pl.col("GF").diff().abs().alias("jump_gf"),
                pl.col("MW").diff().abs().alias("jump_mw"),
            ]
        )

        hit = comb.filter(
            (pl.col("jump_gf") > threshold_gf) | (pl.col("jump_mw") > threshold_mw)
        )
        if not hit.is_empty():
            hit = hit.with_columns(
                (
                    pl.when(pl.col("jump_gf") > threshold_gf)
                    .then(pl.lit("GF"))
                    .otherwise(pl.lit(""))
                    + pl.when(pl.col("jump_mw") > threshold_mw)
                    .then(pl.lit("MW"))
                    .otherwise(pl.lit(""))
                ).alias("type")
            )
            slips.append(
                hit.select(["time", "type"]).with_columns(
                    pl.lit(sat).alias("satellite")
                )
            )

    return pl.concat(slips) if slips else pl.DataFrame()

get_completeness_metrics

get_completeness_metrics()

Calculate observation completeness percentage.

Computes ratio of actual observations to expected observations assuming dual-frequency tracking for all satellites.

Returns: Completeness percentage (0-100)

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> completeness = analyzer.get_completeness_metrics() >>> print(f"Data completeness: {completeness:.1f}%")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_completeness_metrics(self):
    """Calculate observation completeness percentage.

    Computes ratio of actual observations to expected observations
    assuming dual-frequency tracking for all satellites.

    Returns:
        Completeness percentage (0-100)

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> completeness = analyzer.get_completeness_metrics()
        >>> print(f"Data completeness: {completeness:.1f}%")
    """
    """Calculates observation rate vs expected capacity (Obs / (Sats * Epochs * 2))."""
    if not self.epochs or self.df_obs.is_empty():
        return 0.0
    n_epochs = len(self.epochs)
    n_sats = self.df_obs["satellite"].n_unique()
    # Expecting at least 2 bands (L1/L2) per satellite per epoch for RTK
    expected = n_epochs * n_sats * 2
    actual = self.df_obs.filter(pl.col("obs_type").is_in(["L", "C", "P"])).shape[0]
    return min(100.0, (actual / expected) * 100.0) if expected > 0 else 0.0

get_gap_metrics

get_gap_metrics()

Detect data gaps in observation epochs.

Identifies periods where expected observations are missing based on median epoch interval.

Returns: Dictionary with max_gap, n_gaps, and expected_interval

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> gaps = analyzer.get_gap_metrics() >>> print(f"Maximum gap: {gaps['max_gap']:.1f} seconds") >>> print(f"Number of gaps: {gaps['n_gaps']}")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_gap_metrics(self):
    """Detect data gaps in observation epochs.

    Identifies periods where expected observations are missing
    based on median epoch interval.

    Returns:
        Dictionary with max_gap, n_gaps, and expected_interval

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> gaps = analyzer.get_gap_metrics()
        >>> print(f"Maximum gap: {gaps['max_gap']:.1f} seconds")
        >>> print(f"Number of gaps: {gaps['n_gaps']}")
    """
    """Detects periods with no observations."""
    if len(self.epochs) < 2:
        return {"max_gap": 0, "n_gaps": 0}
    diffs = [
        (self.epochs[i + 1] - self.epochs[i]).total_seconds()
        for i in range(len(self.epochs) - 1)
    ]
    # Assume expected interval is the most common difference
    expected_interval = np.median(diffs)
    gaps = [d for d in diffs if d > expected_interval + 0.1]
    return {
        "max_gap": max(diffs) if diffs else 0,
        "n_gaps": len(gaps),
        "expected_interval": expected_interval,
    }

get_integrity_metrics

get_integrity_metrics()

Calculate cycle slip rates for data integrity assessment.

Returns: Dictionary with slip_rate (per satellite per hour) and total_slips

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> integrity = analyzer.get_integrity_metrics() >>> print(f"Slip rate: {integrity['slip_rate']:.2f} per sat/hour")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_integrity_metrics(self):
    """Calculate cycle slip rates for data integrity assessment.

    Returns:
        Dictionary with slip_rate (per satellite per hour) and total_slips

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> integrity = analyzer.get_integrity_metrics()
        >>> print(f"Slip rate: {integrity['slip_rate']:.2f} per sat/hour")
    """
    """Calculates LLI/Slip rates."""
    slips = self.detect_cycle_slips()
    if self.df_obs.is_empty():
        return {"slip_rate": 0, "total_slips": 0}

    n_sats = self.df_obs["satellite"].n_unique()
    duration_hours = (
        (max(self.epochs) - min(self.epochs)).total_seconds() / 3600.0
        if len(self.epochs) > 1
        else 1.0
    )

    total_slips = len(slips)
    slip_rate_per_sat_hour = (
        (total_slips / n_sats / duration_hours) if n_sats > 0 else 0
    )

    return {"slip_rate": slip_rate_per_sat_hour, "total_slips": total_slips}

get_geometric_metrics

get_geometric_metrics()

Assess satellite geometric distribution.

Evaluates azimuth quadrant coverage and elevation spread for current satellite constellation.

Returns: Dictionary with quadrants, el_spread, and diversity_score

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> analyzer.compute_satellite_azel() >>> geom = analyzer.get_geometric_metrics() >>> print(f"Quadrant coverage: {geom['quadrants']}/4") >>> print(f"Diversity score: {geom['diversity_score']:.1f}/100")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_geometric_metrics(self):
    """Assess satellite geometric distribution.

    Evaluates azimuth quadrant coverage and elevation spread
    for current satellite constellation.

    Returns:
        Dictionary with quadrants, el_spread, and diversity_score

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> analyzer.compute_satellite_azel()
        >>> geom = analyzer.get_geometric_metrics()
        >>> print(f"Quadrant coverage: {geom['quadrants']}/4")
        >>> print(f"Diversity score: {geom['diversity_score']:.1f}/100")
    """
    """Assesses geometric distribution (quadrants and elevation spread)."""
    if self.azel_df.is_empty():
        return {"quadrants": 0, "el_spread": 0, "diversity_score": 0}

    # Quadrant coverage (0-90, 90-180, 180-270, 270-360)
    # We take the latest epoch for geometry assessment
    latest_time = self.azel_df["time"].max()
    current_sky = self.azel_df.filter(pl.col("time") == latest_time)

    quads = (
        current_sky.with_columns(
            (pl.col("azimuth") // 90).cast(pl.Int32).alias("quad")
        )["quad"]
        .unique()
        .to_list()
    )

    n_quads = len(quads)
    el_min = current_sky["elevation"].min()
    el_max = current_sky["elevation"].max()

    # Ensure both are numbers and not None
    if el_min is None or el_max is None:
        el_spread = 0.0
    else:
        # Cast to float to ensure proper numeric operations
        el_min_val = float(el_min) if el_min is not None else 0.0  # type: ignore
        el_max_val = float(el_max) if el_max is not None else 0.0  # type: ignore
        el_spread = abs(el_max_val - el_min_val)

    # Diversity score: 0-100 based on quadrants (60%) and el spread (40%)
    quad_p = (n_quads / 4.0) * 100
    el_p = min(100, (float(el_spread) / 60.0) * 100)  # 60 deg spread is good

    diversity = float(quad_p * 0.6 + el_p * 0.4)

    return {
        "quadrants": n_quads,
        "el_spread": el_spread,
        "diversity_score": diversity,
    }

get_per_sat_quality_scores

get_per_sat_quality_scores()

Calculates quality scores for each satellite based on % of 'Good' epochs.

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_per_sat_quality_scores(self):
    """Calculates quality scores for each satellite based on % of 'Good' epochs."""
    # This is now handled within assess_data_quality for efficiency and consistency
    res = self.assess_data_quality()
    return res["sat_scores"]

assess_data_quality

assess_data_quality() -> dict[str, Any]

Comprehensive GNSS data quality assessment.

Implements 4-step quality algorithm evaluating: 1. Good satellite count (>35 dBHz SNR) 2. Sky cell coverage (12 cells) 3. Elevation span (0-90°) 4. Azimuth balance

Returns: Dictionary containing: - score: Overall quality score (0-100) - status_icon: Visual status indicator - metrics: Detailed metric values - red_flags: List of identified issues - sat_scores: Per-satellite quality ratings - epoch_df: Time-series quality metrics

Examples: >>> quality = analyzer.assess_data_quality() >>> print(f"Score: {quality['score']:.1f}/100") >>> for flag in quality['red_flags']: ... print(f"Warning: {flag}")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
def assess_data_quality(self) -> dict[str, Any]:
    """Comprehensive GNSS data quality assessment.

    Implements 4-step quality algorithm evaluating:
    1. Good satellite count (>35 dBHz SNR)
    2. Sky cell coverage (12 cells)
    3. Elevation span (0-90°)
    4. Azimuth balance

    Returns:
        Dictionary containing:
        - score: Overall quality score (0-100)
        - status_icon: Visual status indicator
        - metrics: Detailed metric values
        - red_flags: List of identified issues
        - sat_scores: Per-satellite quality ratings
        - epoch_df: Time-series quality metrics

    Examples:
        >>> quality = analyzer.assess_data_quality()
        >>> print(f"Score: {quality['score']:.1f}/100")
        >>> for flag in quality['red_flags']:
        ...     print(f"Warning: {flag}")
    """
    """
    Calculates session quality based on the 4-Step Algorithm in quality.md.
    Strictly epoch-based per-satellite and session evaluation.
    """
    if self.df_obs.is_empty():
        return {
            "status": "UNCERTAIN",
            "status_icon": "UNCERTAIN",
            "score": 0,
            "reason": "No observation data",
            "metrics": {
                "avg_good_sats": 0,
                "avg_cells": 0,
                "avg_el_span": 0,
                "avg_balance": 0,
            },
            "epoch_df": pl.DataFrame(),
            "sat_scores": pl.DataFrame(),
            "red_flags": ["No observation data available"],
        }

    if self.azel_df.is_empty():
        logger.warning(
            "NAV file not provided - cannot compute geometric quality metrics"
        )
        # Provide basic SNR/MP quality assessment without geometry
        snr = self.get_snr()
        if snr.is_empty():
            basic_score = 0
            sat_count = 0
        else:
            avg_snr = snr.group_by("time").agg(
                pl.col("value").mean().alias("avg_snr")
            )

            mean_snr = avg_snr.select(pl.col("avg_snr").mean()).item()

            basic_score = (
                float(min(100, (mean_snr / 45.0) * 100))
                if avg_snr["avg_snr"].mean()
                else 0
            )
            sat_count = snr["satellite"].n_unique()

        return {
            "status": "UNCERTAIN",
            "status_icon": "UNCERTAIN",
            "score": basic_score,
            "reason": "NAV file missing - limited quality assessment",
            "metrics": {
                "avg_good_sats": sat_count,
                "avg_cells": 0,
                "avg_el_span": 0,
                "avg_balance": 0,
            },
            "epoch_df": pl.DataFrame(),
            "sat_scores": pl.DataFrame(),
            "red_flags": [
                "NAV file not provided - geometric quality metrics unavailable"
            ],
        }

    # 1. Prepare Data
    snr = self.get_snr()
    mp_est = self.estimate_multipath()
    lli_df = self.df_obs.filter(pl.col("obs_type") == "L")

    primary_bands = ["L1", "G1", "E1", "B1"]
    secondary_bands = ["L2", "G2", "E5b", "B2"]

    obs_l1 = (
        snr.filter(pl.col("frequency").is_in(primary_bands))
        .join(
            mp_est.filter(pl.col("frequency").is_in(primary_bands)).select(
                ["time", "satellite", "MP"]
            ),
            on=["time", "satellite"],
            how="inner",
        )
        .join(
            lli_df.filter(pl.col("frequency").is_in(primary_bands)).select(
                ["time", "satellite", "lli"]
            ),
            on=["time", "satellite"],
            how="inner",
        )
        .rename({"value": "snr_l1", "MP": "mp_l1", "lli": "lli_l1"})
    )

    obs_l2 = (
        snr.filter(pl.col("frequency").is_in(secondary_bands))
        .join(
            lli_df.filter(pl.col("frequency").is_in(secondary_bands)).select(
                ["time", "satellite", "lli"]
            ),
            on=["time", "satellite"],
            how="left",
        )
        .rename({"value": "snr_l2", "lli": "lli_l2"})
        .select(["time", "satellite", "snr_l2", "lli_l2"])
    )

    obs_data = obs_l1.join(obs_l2, on=["time", "satellite"], how="left").join(
        self.azel_df, on=["time", "satellite"], how="inner"
    )

    # 2. Mark "GOOD" satellites (Epoch Level)
    # Criteria: SNR > 35 (L1) / 30 (L2), MP < 1.0, LLI == 0, Elevation > 15
    obs_data = obs_data.with_columns(
        (
            (pl.col("snr_l1") > 35)
            & (pl.col("lli_l1") == 0)
            & (
                (pl.col("snr_l2").is_null())
                | ((pl.col("snr_l2") > 30) & (pl.col("lli_l2").fill_null(0) == 0))
            )
            & (pl.col("mp_l1").abs() < 1.0)
            & (pl.col("elevation") > 15)
        ).alias("is_good")
    )

    # 3. Calculate Session Metrics (Epoch by Epoch)
    epoch_stats = []
    for t in self.epochs:
        epoch_obs = obs_data.filter(pl.col("time") == t)
        good_sats = epoch_obs.filter(pl.col("is_good"))
        n_good = good_sats.shape[0]

        if n_good > 0:
            # Step 2: Coverage
            quads = good_sats.with_columns(
                (pl.col("azimuth") // 90).cast(pl.Int32).alias("quad")
            )
            bins = quads.with_columns(
                pl.when(pl.col("elevation") < 30)
                .then(pl.lit("low"))
                .when(pl.col("elevation") < 50)
                .then(pl.lit("mid"))
                .otherwise(pl.lit("high"))
                .alias("ebin")
            )
            cells_covered = bins.select(["quad", "ebin"]).unique().shape[0]

            # Step 3: Geometry
            el_min = good_sats["elevation"].min()
            el_max = good_sats["elevation"].max()

            if el_min is None or el_max is None:
                el_span = 0.0
            else:
                # Cast to float to ensure proper numeric operations
                el_min_val = float(el_min) if el_min is not None else 0.0  # type: ignore
                el_max_val = float(el_max) if el_max is not None else 0.0  # type: ignore
                el_span = abs(el_max_val - el_min_val)

            q_counts = bins.group_by("quad").count()
            if q_counts.shape[0] > 0:
                min_count = q_counts["count"].min()
                max_count = q_counts["count"].max()
                if (
                    min_count is not None
                    and max_count is not None
                    and max_count > 0  # type: ignore
                ):
                    # Cast to float to ensure proper numeric operations
                    min_val = float(min_count) if min_count is not None else 0.0  # type: ignore
                    max_val = float(max_count) if max_count is not None else 1.0  # type: ignore
                    balance = min_val / max_val if max_val > 0 else 0.0
                else:
                    balance = 0.0
            else:
                balance = 0.0
        else:
            cells_covered = 0
            el_span = 0
            balance = 0

        # Step 4: Final Score per epoch
        s_count = float(min(100.0, (n_good / 20.0) * 100))
        s_cov = float((cells_covered / 12.0) * 100)
        s_el = float(min(100.0, (float(el_span) / 45.0) * 100))
        s_az = float(balance * 100)

        epoch_score = float(
            s_count * 0.40 + s_cov * 0.30 + s_el * 0.15 + s_az * 0.15
        )

        epoch_stats.append(
            {
                "time": t,
                "n_good": n_good,
                "cells": cells_covered,
                "el_span": el_span,
                "balance": balance,
                "score": epoch_score,
            }
        )

    epoch_df = pl.DataFrame(epoch_stats)

    # Handle empty epoch_df case
    if epoch_df.is_empty():
        return {
            "status": "UNCERTAIN",
            "status_icon": "UNCERTAIN",
            "score": 0,
            "metrics": {
                "avg_good_sats": 0,
                "avg_cells": 0,
                "avg_el_span": 0,
                "avg_balance": 0,
            },
            "epoch_df": epoch_df,
            "sat_scores": pl.DataFrame(),
            "red_flags": ["No valid epochs found"],
        }

    session_score = epoch_df["score"].mean()

    # 4. Calculate Per-Satellite Session Scores
    # Defined as the % of epochs where the satellite was marked "GOOD"
    duration_hours = (
        (max(self.epochs) - min(self.epochs)).total_seconds() / 3600.0
        if len(self.epochs) > 1
        else 1.0
    )
    sat_quality = obs_data.group_by("satellite").agg(
        [
            (pl.col("is_good").sum() / pl.col("is_good").count() * 100).alias(
                "total_score"
            ),
            pl.col("snr_l1").mean().alias("snr_l1"),
            pl.col("snr_l2").mean().alias("snr_l2"),
            pl.col("mp_l1").abs().mean().alias("mp_val"),
            # Placeholder for slip counts in the table (LLI is already checked per-epoch)
            (pl.col("lli_l1").sum() + pl.col("lli_l2").fill_null(0).sum()).alias(
                "slip_count"
            ),
        ]
    )

    sat_quality = sat_quality.with_columns(
        pl.when(pl.col("total_score") >= 80)
        .then(pl.lit("Excellent"))
        .when(pl.col("total_score") >= 55)
        .then(pl.lit("Fair"))
        .otherwise(pl.lit("Poor"))
        .alias("rating"),
        (pl.col("slip_count") / duration_hours).alias("slip_rate"),
    ).sort("satellite")

    # 5. Result
    def get_grade(s):
        if s >= 85:
            return "Excellent"
        if s >= 70:
            return "Good"
        if s >= 55:
            return "Fair"
        return "Poor"

    status = get_grade(session_score)

    return {
        "status": status,
        "status_icon": {
            "Excellent": "EXCELLENT",
            "Good": "GOOD",
            "Fair": "FAIR",
            "Poor": "POOR",
        }.get(status),
        "score": session_score if session_score is not None else 0,
        "metrics": {
            "avg_good_sats": epoch_df["n_good"].mean() or 0,
            "avg_cells": epoch_df["cells"].mean() or 0,
            "avg_el_span": epoch_df["el_span"].mean() or 0,
            "avg_balance": epoch_df["balance"].mean() or 0,
        },
        "epoch_df": epoch_df,
        "sat_scores": sat_quality,
        "red_flags": (
            [
                f"Critical quality drop in {len(epoch_df.filter(pl.col('score') < 55))} epochs"
            ]
            if len(epoch_df.filter(pl.col("score") < 55)) > 0
            else []
        ),
    }

get_time_span

get_time_span()

Get observation time span.

Returns: Tuple of (start_datetime, end_datetime)

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> start, end = analyzer.get_time_span() >>> duration = (end - start).total_seconds() / 3600 >>> print(f"Session duration: {duration:.1f} hours")

Source code in pils/analyze/ppkdata/RINEX/analyzer.py
def get_time_span(self):
    """Get observation time span.

    Returns:
        Tuple of (start_datetime, end_datetime)

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> start, end = analyzer.get_time_span()
        >>> duration = (end - start).total_seconds() / 3600
        >>> print(f"Session duration: {duration:.1f} hours")
    """
    if not self.epochs:
        return None, None
    return min(self.epochs), max(self.epochs)

RINEXPlotter

RINEXPlotter(analyzer: RINEXAnalyzer)

Visualization engine for RINEX quality analysis.

Creates publication-quality plots for GNSS data quality assessment, including skyplots, time series, histograms, and dashboard views.

Attributes:

Name Type Description
analyzer RINEXAnalyzer

Analyzer instance containing parsed RINEX data

Examples:

>>> analyzer = RINEXAnalyzer('file.obs')
>>> analyzer.parse()
>>> plotter = RINEXPlotter(analyzer)
>>> plotter.plot_all_frequencies_summary('dashboard.png')

Initialize plotter with analyzer instance.

Args: analyzer: RINEXAnalyzer with parsed data

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def __init__(self, analyzer: "RINEXAnalyzer") -> None:
    """Initialize plotter with analyzer instance.

    Args:
        analyzer: RINEXAnalyzer with parsed data
    """
    self.analyzer = analyzer

plot_all_frequencies_summary

plot_all_frequencies_summary(save_path: str | None = None) -> None

Generate 2x2 global dashboard of frequency band metrics.

Creates comprehensive dashboard showing: - Average SNR by band - Multipath RMS by band - Observation counts - Standard deviation metrics

Args: save_path: Output file path (PNG format)

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_all_frequencies_summary('frequencies.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_all_frequencies_summary(self, save_path: str | None = None) -> None:
    """Generate 2x2 global dashboard of frequency band metrics.

    Creates comprehensive dashboard showing:
    - Average SNR by band
    - Multipath RMS by band
    - Observation counts
    - Standard deviation metrics

    Args:
        save_path: Output file path (PNG format)

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_all_frequencies_summary('frequencies.png')
    """
    summary = self.analyzer.get_global_frequency_summary()
    if summary.is_empty():
        return

    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    fig.patch.set_alpha(0)

    # 1. Avg SNR by Band
    colors = [
        GNSSColors.get_constellation_color(b[0]) for b in summary["frequency"]
    ]
    axes[0, 0].bar(
        summary["frequency"].to_list(),
        summary["mean"].to_numpy(),
        color=colors,
        alpha=0.8,
    )
    axes[0, 0].axhline(35, color="red", ls="--")
    axes[0, 0].set_title("Average SNR by Frequency Band", fontweight="bold")
    GNSSColors.apply_theme(axes[0, 0])

    # 2. MP RMS by Band
    valid_mp = summary.filter(pl.col("mean_MP_RMS").is_not_null())
    if not valid_mp.is_empty():
        axes[0, 1].bar(
            valid_mp["frequency"].to_list(),
            valid_mp["mean_MP_RMS"].to_numpy(),
            color="#e377c2",
            alpha=0.8,
        )
        axes[0, 1].set_title("Average Multipath RMS by Band", fontweight="bold")
        GNSSColors.apply_theme(axes[0, 1])

    # 3. Observation Count
    paired_cmap = plt.get_cmap("Paired")
    colors = [
        paired_cmap(i / paired_cmap.N)
        for i in range(min(len(summary), paired_cmap.N))
    ]
    axes[1, 0].pie(
        summary["count"].to_numpy(),
        labels=summary["frequency"].to_list(),
        autopct="%1.1f%%",
        colors=colors,
    )
    axes[1, 0].set_title("Observation Distribution", fontweight="bold")

    # 4. Satellite Diversity
    axes[1, 1].bar(
        summary["frequency"].to_list(),
        summary["n_satellites"].to_numpy(),
        color=GNSSColors.BEIDOU,
    )
    axes[1, 1].set_title("Satellites Tracked per Band", fontweight="bold")
    GNSSColors.apply_theme(axes[1, 1])

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_skyplot_snr

plot_skyplot_snr(pool: str = 'single', save_path: str | None = None) -> None

Generate polar skyplot showing satellite tracks colored by SNR.

Args: pool: Frequency pool - 'single' (L1) or 'dual' (L2) save_path: Output file path (PNG format)

Examples: >>> plotter.plot_skyplot_snr('single', 'skyplot_L1.png') >>> plotter.plot_skyplot_snr('dual', 'skyplot_L2.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_skyplot_snr(
    self, pool: str = "single", save_path: str | None = None
) -> None:
    """Generate polar skyplot showing satellite tracks colored by SNR.

    Args:
        pool: Frequency pool - 'single' (L1) or 'dual' (L2)
        save_path: Output file path (PNG format)

    Examples:
        >>> plotter.plot_skyplot_snr('single', 'skyplot_L1.png')
        >>> plotter.plot_skyplot_snr('dual', 'skyplot_L2.png')
    """
    if self.analyzer.azel_df.is_empty():
        return
    bands = RTKLIB_bands[pool]
    snr = self.analyzer.get_snr().filter(pl.col("frequency").is_in(bands))
    data = snr.join(self.analyzer.azel_df, on=["time", "satellite"])
    if data.is_empty():
        return

    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111, projection="polar")
    fig.patch.set_alpha(0)
    # Set polar plot properties with try/except for compatibility
    try:
        ax.set_theta_zero_location("N")  # type: ignore
    except AttributeError:
        pass
    try:
        ax.set_theta_direction(-1)  # type: ignore
    except AttributeError:
        pass

    # Use numpy directly
    az = np.deg2rad(data["azimuth"].to_numpy())
    dist = 90 - data["elevation"].to_numpy()
    sc = ax.scatter(
        az, dist, c=data["value"].to_numpy(), cmap="viridis", s=8, alpha=0.6
    )
    plt.colorbar(sc, ax=ax, label="SNR (dB-Hz)")

    # Labels - label the middle of each track
    for sat in data["satellite"].unique().to_list():
        sub = data.filter(pl.col("satellite") == sat)
        mid = len(sub) // 2
        ax.text(
            np.deg2rad(sub["azimuth"][mid]),
            90 - sub["elevation"][mid],
            sat,
            fontsize=9,
            fontweight="bold",
            bbox=dict(facecolor="white", alpha=0.5, boxstyle="round"),
        )

    ax.set_title(
        f"Skyplot SNR Tracks: {pool.capitalize()} Band Pools",
        pad=30,
        fontweight="bold",
        fontsize=14,
    )
    GNSSColors.apply_theme(ax)
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_elevation_dependent_stats

plot_elevation_dependent_stats(pool='single', save_path=None)

Two panels: SNR and MP vs Elevation (Binned Averages, Color-coded by Sat Count).

Args: pool: Frequency pool - 'single' or 'dual' save_path: Output file path

Examples: >>> analyzer = RINEXAnalyzer('file.obs') >>> analyzer.parse_obs_file() >>> analyzer.compute_satellite_azel() >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_elevation_dependent_stats('single', 'elevation_stats.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_elevation_dependent_stats(self, pool="single", save_path=None):
    """Two panels: SNR and MP vs Elevation (Binned Averages, Color-coded by Sat Count).

    Args:
        pool: Frequency pool - 'single' or 'dual'
        save_path: Output file path

    Examples:
        >>> analyzer = RINEXAnalyzer('file.obs')
        >>> analyzer.parse_obs_file()
        >>> analyzer.compute_satellite_azel()
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_elevation_dependent_stats('single', 'elevation_stats.png')
    """
    if self.analyzer.azel_df.is_empty():
        return
    bands = RTKLIB_bands[pool]

    # 1. Prepare SNR Data
    snr = self.analyzer.get_snr().filter(pl.col("frequency").is_in(bands))
    merged = snr.join(self.analyzer.azel_df, on=["time", "satellite"])

    # Bin by elevation (1 degree bins)
    snr_binned = (
        merged.with_columns((pl.col("elevation").round(0)).alias("el_bin"))
        .group_by("el_bin")
        .agg(
            [
                pl.col("value").mean().alias("avg_snr"),
                pl.col("satellite").n_unique().alias("n_sats"),
            ]
        )
        .sort("el_bin")
    )

    # 2. Prepare MP Data
    mp = self.analyzer.estimate_multipath().filter(pl.col("frequency").is_in(bands))
    mp_binned = pl.DataFrame()
    if not mp.is_empty():
        mp_merged = mp.join(self.analyzer.azel_df, on=["time", "satellite"])
        mp_binned = (
            mp_merged.with_columns((pl.col("elevation").round(0)).alias("el_bin"))
            .group_by("el_bin")
            .agg(
                [
                    pl.col("MP").abs().mean().alias("avg_mp"),
                    pl.col("satellite").n_unique().alias("n_sats"),
                ]
            )
            .sort("el_bin")
        )

    # 3. Plotting
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 7))
    fig.patch.set_alpha(0)

    # Left: SNR vs Elevation
    sc1 = ax1.scatter(
        snr_binned["el_bin"].to_numpy(),
        snr_binned["avg_snr"].to_numpy(),
        c=snr_binned["n_sats"].to_numpy(),
        cmap="plasma",
        s=50,
        edgecolor="none",
    )
    ax1.set_xlabel("Elevation (deg)", fontweight="bold")
    ax1.set_ylabel("AVG SNR (dB-Hz)", fontweight="bold")
    ax1.set_title(f"{pool.capitalize()} Pool: SNR vs Elevation", fontweight="bold")
    plt.colorbar(sc1, ax=ax1, label="# Satellites")
    GNSSColors.apply_theme(ax1)

    # Right: Multipath vs Elevation
    if not mp_binned.is_empty():
        sc2 = ax2.scatter(
            mp_binned["el_bin"].to_numpy(),
            mp_binned["avg_mp"].to_numpy(),
            c=mp_binned["n_sats"].to_numpy(),
            cmap="plasma",
            s=50,
            edgecolor="none",
        )
        ax2.set_xlabel("Elevation (deg)", fontweight="bold")
        ax2.set_ylabel("AVG Multipath (m)", fontweight="bold")
        ax2.set_title(
            f"{pool.capitalize()} Pool: MP vs Elevation", fontweight="bold"
        )
        plt.colorbar(sc2, ax=ax2, label="# Satellites")
        GNSSColors.apply_theme(ax2)
    else:
        ax2.text(
            0.5,
            0.5,
            "No Multipath Data Available",
            ha="center",
            transform=ax2.transAxes,
        )
        GNSSColors.apply_theme(ax2)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

plot_snr_time_series

plot_snr_time_series(satellites, freq_band, save_path=None)

Plot SNR time series for specified satellites and frequency.

Args: satellites: List of satellite IDs (e.g., ['G01', 'G02']) freq_band: Frequency band (e.g., 'L1', 'L2') save_path: Output file path

Examples: >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_snr_time_series(['G01', 'G05', 'G12'], 'L1', 'snr_gps.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_snr_time_series(self, satellites, freq_band, save_path=None):
    """Plot SNR time series for specified satellites and frequency.

    Args:
        satellites: List of satellite IDs (e.g., ['G01', 'G02'])
        freq_band: Frequency band (e.g., 'L1', 'L2')
        save_path: Output file path

    Examples:
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_snr_time_series(['G01', 'G05', 'G12'], 'L1', 'snr_gps.png')
    """
    snr = self.analyzer.get_snr().filter(
        (pl.col("satellite").is_in(satellites)) & (pl.col("frequency") == freq_band)
    )
    if snr.is_empty():
        return

    fig, ax = plt.subplots(figsize=(14, 7))
    fig.patch.set_alpha(0)
    for sat in satellites:
        sub = snr.filter(pl.col("satellite") == sat)
        if not sub.is_empty():
            ax.plot(
                sub["time"].to_numpy(),
                sub["value"].to_numpy(),
                label=sat,
                alpha=0.8,
                linewidth=1.5,
            )

    ax.set_ylabel("SNR (dB-Hz)", fontweight="bold")
    ax.set_title(
        f"Signal Strength Time Series: {freq_band}", fontweight="bold", fontsize=14
    )
    ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left", ncol=2)
    GNSSColors.apply_theme(ax)
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()

plot_multipath_time_series

plot_multipath_time_series(satellites, freq_band, save_path=None)

Plot multipath time series for specified satellites.

Args: satellites: List of satellite IDs freq_band: Frequency band save_path: Output file path

Examples: >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_multipath_time_series(['E01', 'E11'], 'E1', 'mp_galileo.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_multipath_time_series(self, satellites, freq_band, save_path=None):
    """Plot multipath time series for specified satellites.

    Args:
        satellites: List of satellite IDs
        freq_band: Frequency band
        save_path: Output file path

    Examples:
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_multipath_time_series(['E01', 'E11'], 'E1', 'mp_galileo.png')
    """
    mp = self.analyzer.estimate_multipath().filter(
        (pl.col("satellite").is_in(satellites)) & (pl.col("frequency") == freq_band)
    )
    if mp.is_empty():
        return

    fig, ax = plt.subplots(figsize=(14, 7))
    fig.patch.set_alpha(0)
    for sat in satellites:
        sub = mp.filter(pl.col("satellite") == sat)
        if not sub.is_empty():
            ax.plot(
                sub["time"].to_numpy(), sub["MP"].to_numpy(), label=sat, alpha=0.7
            )

    ax.set_ylabel("MP (meters)", fontweight="bold")
    ax.set_title(
        f"Multipath Time Series: {freq_band}", fontweight="bold", fontsize=14
    )
    ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left", ncol=2)
    ax.grid(True, alpha=0.2)
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()

plot_constellation_histograms

plot_constellation_histograms(const, bands, save_path=None)

Plot SNR histograms for constellation.

Args: const: Constellation code ('G', 'R', 'E', 'C') bands: List of frequency bands save_path: Output file path

Examples: >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_constellation_histograms('G', ['L1', 'L2'], 'gps_hist.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_constellation_histograms(self, const, bands, save_path=None):
    """Plot SNR histograms for constellation.

    Args:
        const: Constellation code ('G', 'R', 'E', 'C')
        bands: List of frequency bands
        save_path: Output file path

    Examples:
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_constellation_histograms('G', ['L1', 'L2'], 'gps_hist.png')
    """
    snr = self.analyzer.get_snr().filter(
        (pl.col("constellation") == const) & (pl.col("frequency").is_in(bands))
    )
    if snr.is_empty():
        return

    fig, axes = plt.subplots(
        1, len(bands), figsize=(14, 5), squeeze=False, sharex=True
    )
    fig.patch.set_alpha(0)
    color = GNSSColors.get_constellation_color(const)
    for i, band in enumerate(bands):
        sub = snr.filter(pl.col("frequency") == band)
        axes[0, i].hist(
            sub["value"].to_numpy(),
            bins=30,
            color=color,
            alpha=0.7,
            edgecolor="black",
        )
        axes[0, i].set_title(f"SNR Band {band}", fontweight="bold")
        axes[0, i].axvline(35, color="red", linestyle="--", alpha=0.5)
        GNSSColors.apply_theme(axes[0, i])
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()

plot_sat_avg_snr_bar

plot_sat_avg_snr_bar(const, save_path=None)

Plot average SNR bar chart per satellite.

Args: const: Constellation code save_path: Output file path

Examples: >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_sat_avg_snr_bar('R', 'glonass_snr_bars.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_sat_avg_snr_bar(self, const, save_path=None):
    """Plot average SNR bar chart per satellite.

    Args:
        const: Constellation code
        save_path: Output file path

    Examples:
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_sat_avg_snr_bar('R', 'glonass_snr_bars.png')
    """
    stats = self.analyzer.get_snr_statistics().filter(
        pl.col("satellite").str.starts_with(const)
    )
    if stats.is_empty():
        return

    sats = sorted(stats["satellite"].unique().to_list())
    bands = sorted(stats["frequency"].unique().to_list())

    fig, ax = plt.subplots(figsize=(14, 6))
    fig.patch.set_alpha(0)

    x = np.arange(len(sats))
    width = 0.8 / len(bands)

    for i, band in enumerate(bands):
        vals = []
        for sat in sats:
            row = stats.filter(
                (pl.col("satellite") == sat) & (pl.col("frequency") == band)
            )
            vals.append(row["mean"][0] if not row.is_empty() else 0)
        ax.bar(
            x + i * width - 0.4 + width / 2,
            vals,
            width,
            color=[GNSSColors.BAND_PRIMARY, GNSSColors.BAND_SECONDARY][i % 2],
            label=band,
            alpha=0.8,
        )

    ax.set_xticks(x)
    ax.set_xticklabels(sats, rotation=45)
    ax.set_ylabel("Mean SNR (dB-Hz)")
    ax.set_title(
        f"Average SNR per {CONSTELLATION_NAMES.get(const, const)} Satellite",
        fontweight="bold",
    )
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()

plot_cycle_slips

plot_cycle_slips(save_path=None)

Plot cycle slip detections over time.

Args: save_path: Output file path

Examples: >>> plotter = RINEXPlotter(analyzer) >>> plotter.plot_cycle_slips('cycle_slips.png')

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_cycle_slips(self, save_path=None):
    """Plot cycle slip detections over time.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = RINEXPlotter(analyzer)
        >>> plotter.plot_cycle_slips('cycle_slips.png')
    """
    """Restores the detailed scatter plot for Cycle Slips."""
    slips = self.analyzer.detect_cycle_slips()
    if slips.is_empty():
        return

    fig, ax = plt.subplots(figsize=(14, 8))
    fig.patch.set_alpha(0)

    sats = sorted(
        slips["satellite"].unique().to_list(),
        key=lambda x: (x[0], int(x[1:]) if x[1:].isdigit() else 0),
    )
    sat_map = {s: i for i, s in enumerate(sats)}

    # Plot by type
    for t, m, c in [
        ("GF", "x", GNSSColors.SINGLE),
        ("MW", "+", GNSSColors.GPS),
        ("GFMW", "o", GNSSColors.FIX),
    ]:
        subset = slips.filter(
            pl.col("type").str.contains(t)
            if t != "GFMW"
            else pl.col("type") == "GFMW"
        )
        if not subset.is_empty():
            ax.scatter(
                subset["time"].to_numpy(),
                [sat_map[s] for s in subset["satellite"].to_list()],
                marker=m,
                color=c,
                label=f"{t} Slip",
                s=60,
                zorder=10,
            )

    ax.set_yticks(range(len(sats)))
    ax.set_yticklabels(sats)
    ax.set_title(
        "Detected Cycle Slips (Integrity Events)", fontweight="bold", fontsize=14
    )
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()

plot_good_satellites_trend

plot_good_satellites_trend(epoch_df, save_path=None)

Plots the number of 'Good' satellites per epoch over time.

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_good_satellites_trend(self, epoch_df, save_path=None):
    """Plots the number of 'Good' satellites per epoch over time."""
    if epoch_df.is_empty():
        return

    fig, ax = plt.subplots(figsize=(14, 6))
    fig.patch.set_alpha(0)

    ax.fill_between(
        epoch_df["time"].to_numpy(),
        epoch_df["n_good"].to_numpy(),
        color=GNSSColors.FIX,
        alpha=0.3,
    )
    ax.plot(
        epoch_df["time"].to_numpy(),
        epoch_df["n_good"].to_numpy(),
        color=GNSSColors.FIX,
        linewidth=2,
        label="Good Satellites",
    )

    # Target threshold
    ax.axhline(15, color=GNSSColors.FLOAT, ls="--", alpha=0.6, label="Target (15+)")
    ax.axhline(8, color=GNSSColors.SINGLE, ls=":", alpha=0.6, label="Critical (8)")

    ax.set_ylabel("Satellite Count", fontweight="bold")
    ax.set_title(
        "Constellation Health: 'Good' Satellites Over Time",
        fontweight="bold",
        fontsize=14,
    )
    ax.legend(loc="upper right")
    GNSSColors.apply_theme(ax)

    # Format time axis
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_band_comparison

plot_band_comparison(save_path=None)

Grouped bar chart comparing Primary (L1) vs Secondary (L2) SNR for all constellations.

Source code in pils/analyze/ppkdata/RINEX/plotter.py
def plot_band_comparison(self, save_path=None):
    """Grouped bar chart comparing Primary (L1) vs Secondary (L2) SNR for all constellations."""
    summary = self.analyzer.get_global_frequency_summary()
    if summary.is_empty():
        return

    # Define Primary vs Secondary mapping
    bands_map = {
        "G": ("G1", "G2"),
        "E": ("E1", "E5b"),
        "C": ("B1", "B2"),
        "R": ("G1", "G2"),
    }

    valid_constellations = []
    primary_vals = []
    secondary_vals = []

    for c, (p_band, s_band) in bands_map.items():
        p_val = summary.filter(
            (pl.col("constellation") == c) & (pl.col("frequency") == p_band)
        )["mean"]
        s_val = summary.filter(
            (pl.col("constellation") == c) & (pl.col("frequency") == s_band)
        )["mean"]

        if not p_val.is_empty():
            valid_constellations.append(CONSTELLATION_NAMES.get(c, c))
            primary_vals.append(p_val[0])
            secondary_vals.append(s_val[0] if not s_val.is_empty() else 0)

    if not valid_constellations:
        return

    x = np.arange(len(valid_constellations))
    width = 0.35

    fig, ax = plt.subplots(figsize=(14, 7))
    fig.patch.set_alpha(0)

    ax.bar(
        x - width / 2,
        primary_vals,
        width,
        label="Primary (L1/E1/B1)",
        color=GNSSColors.BAND_PRIMARY,
        alpha=0.9,
    )
    ax.bar(
        x + width / 2,
        secondary_vals,
        width,
        label="Secondary (L2/E5b/B2)",
        color=GNSSColors.BAND_SECONDARY,
        alpha=0.9,
    )

    ax.set_ylabel("Mean SNR (dB-Hz)", fontweight="bold")
    ax.set_title(
        "Primary vs Secondary Signal Strength Comparison",
        fontweight="bold",
        fontsize=16,
    )
    ax.set_xticks(x)
    ax.set_xticklabels(valid_constellations, fontweight="bold")
    ax.legend()
    GNSSColors.apply_theme(ax)

    # Add values on top
    for i, v in enumerate(primary_vals):
        if v > 0:
            ax.text(
                i - width / 2, v + 0.5, f"{v:.1f}", ha="center", fontweight="bold"
            )
    for i, v in enumerate(secondary_vals):
        if v > 0:
            ax.text(
                i + width / 2, v + 0.5, f"{v:.1f}", ha="center", fontweight="bold"
            )

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

RINEXReport

RINEXReport(rinex_obs: Path | None = None, rinex_nav: Path | None = None, analyzer: RINEXAnalyzer | None = None, plotter: RINEXPlotter | None = None)

Generate comprehensive RINEX quality analysis reports.

Orchestrates RINEXAnalyzer and RINEXPlotter to produce detailed markdown reports with plots analyzing GNSS data quality.

Attributes:

Name Type Description
analyzer RINEXAnalyzer

Analyzer instance for data processing

plotter RINEXPlotter

Plotter instance for visualization

Examples:

>>> report = RINEXReport(rinex_obs=Path('file.obs'), rinex_nav=Path('file.nav'))
>>> report.generate('quality_report.md', plot_folder='plots')
>>> # Creates quality_report.md with plots in plots/ subfolder

Initialize RINEX report generator.

Args: rinex_obs: Path to RINEX observation file (if analyzer not provided) rinex_nav: Path to RINEX navigation file (if analyzer not provided) analyzer: Existing RINEXAnalyzer instance plotter: Existing RINEXPlotter instance

Examples: >>> # From file >>> report = RINEXReport(rinex_obs=Path('station.obs'), rinex_nav=Path('station.nav')) >>> # From existing analyzer >>> analyzer = RINEXAnalyzer('station.obs') >>> analyzer.parse_obs_file() >>> report = RINEXReport(analyzer=analyzer)

Source code in pils/analyze/ppkdata/RINEX/report.py
def __init__(
    self,
    rinex_obs: Path | None = None,
    rinex_nav: Path | None = None,
    analyzer: RINEXAnalyzer | None = None,
    plotter: RINEXPlotter | None = None,
) -> None:
    """Initialize RINEX report generator.

    Args:
        rinex_obs: Path to RINEX observation file (if analyzer not provided)
        rinex_nav: Path to RINEX navigation file (if analyzer not provided)
        analyzer: Existing RINEXAnalyzer instance
        plotter: Existing RINEXPlotter instance

    Examples:
        >>> # From file
        >>> report = RINEXReport(rinex_obs=Path('station.obs'), rinex_nav=Path('station.nav'))
        >>> # From existing analyzer
        >>> analyzer = RINEXAnalyzer('station.obs')
        >>> analyzer.parse_obs_file()
        >>> report = RINEXReport(analyzer=analyzer)
    """
    if analyzer is not None:
        self.analyzer = analyzer
    else:
        if rinex_obs is not None and rinex_nav is not None:
            self.analyzer = RINEXAnalyzer(rinex_obs, navpath=rinex_nav)
            self.analyzer.parse_obs_file()  # Parse the RINEX file
            self.analyzer.parse_nav_file()
        else:
            logger.info("No analyzer nor RINEX file provided")
            self.analyzer = None  # type: ignore

    if plotter is not None:
        self.plotter = plotter
    else:
        if self.analyzer is not None:
            self.plotter = RINEXPlotter(self.analyzer)
        else:
            self.plotter = None  # type: ignore

generate

generate(report_name: str | Path = 'rinex_report.md', plot_folder: str = 'assets') -> str

Generate complete RINEX quality analysis report.

Args: report_name: Output path for markdown report plot_folder: Subfolder name for plot assets

Returns: Path to generated report file

Raises: ValueError: If analyzer or plotter not initialized

Examples: >>> report = RINEXReport(rinex_obs=Path('file.obs'), rinex_nav=Path('file.nav')) >>> report_path = report.generate('rinex_quality.md', plot_folder='figures') >>> print(f"Report generated at: {report_path}")

Source code in pils/analyze/ppkdata/RINEX/report.py
def generate(
    self, report_name: str | Path = "rinex_report.md", plot_folder: str = "assets"
) -> str:
    """Generate complete RINEX quality analysis report.

    Args:
        report_name: Output path for markdown report
        plot_folder: Subfolder name for plot assets

    Returns:
        Path to generated report file

    Raises:
        ValueError: If analyzer or plotter not initialized

    Examples:
        >>> report = RINEXReport(rinex_obs=Path('file.obs'), rinex_nav=Path('file.nav'))
        >>> report_path = report.generate('rinex_quality.md', plot_folder='figures')
        >>> print(f"Report generated at: {report_path}")
    """
    if self.analyzer is None:
        raise ValueError(
            "Analyzer not initialized. Provide rinex file or analyzer instance."
        )
    if self.plotter is None:
        raise ValueError("Plotter not initialized.")

    report_path = Path(report_name)
    output_dir = report_path.parent
    output_dir.mkdir(parents=True, exist_ok=True)

    assets_dir = output_dir / plot_folder
    assets_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Generating RINEX quality report in '{output_dir}'")

    # 0. Data Preparation
    self.analyzer.compute_satellite_azel()
    freq_summary = self.analyzer.get_global_frequency_summary()

    # 1. Header & Quality Scoreboard
    start_time, end_time = self.analyzer.get_time_span()
    quality = self.analyzer.assess_data_quality()

    report = f"# GNSS Quality Analysis: {self.analyzer.obsname}\n\n"
    report += f"**Analysis Date:** {datetime.now():%Y-%m-%d %H:%M:%S}\n"
    if start_time and end_time:
        report += f"**Session Start:** {start_time:%Y-%m-%d %H:%M:%S}\n"
        report += f"**Session End:**   {end_time:%Y-%m-%d %H:%M:%S}\n"
        report += f"**Duration:**      {end_time - start_time}\n\n"

    report += "## Executive Quality Scoreboard\n"
    score = quality["score"]
    status_icon = quality["status_icon"]

    report += f"### Overall Score: **{score:.1f}/100** ({status_icon})\n\n"

    # Red Flags Alert
    if quality["red_flags"]:
        report += "### Red Flags Detected\n"
        for flag in quality["red_flags"]:
            report += f"- {flag}\n"
        report += "\n"

    # 4-Step Algorithm Summary
    m = quality["metrics"]
    report += "#### 🛰️ 4-Step Algorithm Metrics (Session Avg)\n"
    report += "| Good Sats (40%) | Cell Coverage (30%) | Elevation Span (15%) | Azimuth Balance (15%) |\n"
    report += "|---|---|---|---|\n"
    avg_sats = (
        f"{m['avg_good_sats']:.1f}" if m["avg_good_sats"] is not None else "N/A"
    )
    avg_cells = f"{m['avg_cells']:.1f}" if m["avg_cells"] is not None else "N/A"
    avg_el_span = (
        f"{m['avg_el_span']:.1f}°" if m["avg_el_span"] is not None else "N/A"
    )
    avg_balance = (
        f"{m['avg_balance']:.2f}" if m["avg_balance"] is not None else "N/A"
    )
    report += f"| {avg_sats} / 20 | {avg_cells} / 12 | {avg_el_span} | {avg_balance} |\n\n"

    # Good Satellites Trend Plot
    trend_path = assets_dir / "good_sats_trend.png"
    logger.debug("Generating Good Satellites trend plot")
    self.plotter.plot_good_satellites_trend(quality["epoch_df"], str(trend_path))
    if trend_path.exists():
        report += f"![Good Satellites Trend]({plot_folder}/good_sats_trend.png)\n\n"

    # Fleet Review Table
    report += "### Satellite Quality Fleet Review\n"
    report += "| Sat | Rating | Score | SNR L1 | SNR L2 | MP RMS | Slips/h |\n"
    report += "|---|---|---|---|---|---|---|\n"
    for row in quality["sat_scores"].iter_rows(named=True):
        s1 = (
            f"{row['snr_l1']:.1f}"
            if row["snr_l1"] is not None and row["snr_l1"] > 0
            else "-"
        )
        s2 = (
            f"{row['snr_l2']:.1f}"
            if row["snr_l2"] is not None and row["snr_l2"] > 0
            else "-"
        )
        score_val = (
            f"{row['total_score']:.1f}" if row["total_score"] is not None else "N/A"
        )
        mp_val = f"{row['mp_val']:.3f}" if row["mp_val"] is not None else "N/A"
        slip_val = (
            f"{row['slip_rate']:.1f}" if row["slip_rate"] is not None else "N/A"
        )
        report += f"| {row['satellite']} | {row['rating']} | {score_val} | {s1} | {s2} | {mp_val} | {slip_val} |\n"
    report += "\n"

    if score > 75:
        report += "> [!NOTE]\n> The data pool is solid. Major constellations are reliable for high-precision GNSS processing.\n\n"
    else:
        report += "> [!CAUTION]\n> High degree of satellite degradation. RTK positions may be biased or suffer from long fix times. Review Fleet Review Table.\n\n"

    # Global Dashboard
    dash_path = assets_dir / "dashboard_global.png"
    logger.debug("Building Global Dashboard")
    self.plotter.plot_all_frequencies_summary(str(dash_path))
    if dash_path.exists():
        report += "## Global Performance Dashboard\n"
        report += f"![Global Dashboard]({plot_folder}/dashboard_global.png)\n\n"

    # Band Comparison Plot
    comp_path = assets_dir / "band_comparison.png"
    logger.debug("Generating Primary vs Secondary comparison plot")
    self.plotter.plot_band_comparison(str(comp_path))
    if comp_path.exists():
        report += f"#### Multi-Band SNR Hierarchy\n![Band Comparison]({plot_folder}/band_comparison.png)\n\n"

    report += "### Frequency Band Metrics\n"
    report += "| Band | Mean SNR | Std SNR | MP RMS (m) | Sats | Observations |\n|---|---|---|---|---|---|\n"
    for row in freq_summary.iter_rows(named=True):
        mean_val = f"{row['mean']:.1f}" if row["mean"] is not None else "N/A"
        std_val = f"{row['std']:.2f}" if row["std"] is not None else "N/A"
        mp_val = (
            f"{row['mean_MP_RMS']:.3f}" if row["mean_MP_RMS"] is not None else "N/A"
        )
        report += f"| {row['frequency']} | {mean_val} | {std_val} | {mp_val} | {row['n_satellites']} | {row['count']} |\n"

    # 2. Pooled Distribution & Elevation Dependency
    pooled_path = assets_dir / "pooled_comparison.png"
    logger.debug("Generating Pooled Distributions")
    self.plotter.plot_global_l1_l2_comparison_hist(str(pooled_path))
    if pooled_path.exists():
        report += "\n## Multi-Constellation Quality Context\n"
        report += f"#### Global SNR Pooled Benchmarking\n![Comparison]({plot_folder}/pooled_comparison.png)\n\n"

    for pool in ["single", "dual"]:
        name = "L1-Band" if pool == "single" else "L2-Band"
        logger.debug(f"Generating {name} context plots")

        # Skyplot
        sky_path = assets_dir / f"sky_{pool}.png"
        self.plotter.plot_skyplot_snr(pool=pool, save_path=str(sky_path))
        if sky_path.exists():
            report += f"### {name} Tracking & Quality\n![Skyplot]({plot_folder}/sky_{pool}.png)\n\n"

        # Elevation Dependence
        el_path = assets_dir / f"elevation_{pool}.png"
        self.plotter.plot_elevation_dependent_stats(
            pool=pool, save_path=str(el_path)
        )
        if el_path.exists():
            report += f"#### Elevation Dependency (SNR & MP)\n![Elevation]({plot_folder}/elevation_{pool}.png)\n\n"

    # 3. Detailed Constellation Performance
    report += "## Constellation-Specific Analysis\n"
    constellations = sorted(
        self.analyzer.df_obs["constellation"].unique().to_list()
    )
    for const in constellations:
        cname = CONSTELLATION_NAMES.get(const, const)

        # Try to generate per-constellation plots first to see if we should add the header
        bar_path = assets_dir / f"bar_{const}.png"
        hist_path = assets_dir / f"hist_{const}.png"

        self.plotter.plot_sat_avg_snr_bar(const, str(bar_path))
        self.plotter.plot_constellation_histograms(
            const,
            sorted(
                self.analyzer.df_obs.filter(pl.col("constellation") == const)[
                    "frequency"
                ]
                .unique()
                .to_list()
            ),
            str(hist_path),
        )

        if bar_path.exists() or hist_path.exists():
            report += f"### {cname} Performance Review\n"
            if bar_path.exists():
                report += f"#### Average SNR per Spacecraft\n![Bar]({plot_folder}/bar_{const}.png)\n\n"
            if hist_path.exists():
                report += f"#### Quality Distribution by Band\n![Hist]({plot_folder}/hist_{const}.png)\n\n"

        # Detailed Time Series
        bands = sorted(
            self.analyzer.df_obs.filter(pl.col("constellation") == const)[
                "frequency"
            ]
            .unique()
            .to_list()
        )
        for band in bands:
            logger.debug(f"Detailed plots for {cname} {band}")
            sats = sorted(
                self.analyzer.df_obs.filter(
                    (pl.col("constellation") == const)
                    & (pl.col("frequency") == band)
                )["satellite"]
                .unique()
                .to_list()
            )

            # SNR Time Series
            img_snr = f"ts_snr_{const}_{band}.png"
            snr_path = assets_dir / img_snr
            self.plotter.plot_snr_time_series(sats, band, str(snr_path))
            if snr_path.exists():
                report += f"#### Band {band} Time Series\n![SNR]({plot_folder}/{img_snr})\n\n"

            # Multipath Time Series
            img_mp = f"ts_mp_{const}_{band}.png"
            mp_path = assets_dir / img_mp
            self.plotter.plot_multipath_time_series(sats, band, str(mp_path))
            if mp_path.exists():
                report += f"![MP]({plot_folder}/{img_mp})\n\n"

    # 4. Signal Integrity & Reliability
    slip_path = assets_dir / "cycle_slips.png"
    logger.debug("Generating Integrity Dashboards")
    self.plotter.plot_cycle_slips(str(slip_path))
    if slip_path.exists():
        report += "## Signal Integrity Monitoring\n"
        report += f"### Cycle Slip Event Detection (GF/MW Combined)\n![Slips]({plot_folder}/cycle_slips.png)\n"

    with open(report_path, "w") as f:
        f.write(report)

    logger.info(f"Report generated: {report_path}")
    return str(report_path)

PPK Analysis

POSAnalyzer

POSAnalyzer(filepath: str | Path)

Polars-based analyzer for RTKLIB .pos solution files.

Parses and analyzes RTK position solutions from RTKLIB, computing statistics and ENU offsets.

Attributes:

Name Type Description
filepath str

Path to .pos file

df DataFrame

Parsed position data

header list

Header lines from .pos file

Examples:

>>> analyzer = POSAnalyzer('solution.pos')
>>> df = analyzer.parse()
>>> stats = analyzer.get_statistics()
>>> print(f"Fix rate: {stats['fix_rate']:.1f}%")

Initialize position analyzer.

Args: filepath: Path to RTKLIB .pos file

Source code in pils/analyze/ppkdata/PPK/pos_analyzer.py
def __init__(self, filepath: str | Path) -> None:
    """Initialize position analyzer.

    Args:
        filepath: Path to RTKLIB .pos file
    """
    self.filepath = filepath
    self.df = pl.DataFrame()
    self.header = []

parse

parse() -> DataFrame

Parse .pos file into Polars DataFrame.

Reads RTKLIB position solution file and converts to structured DataFrame with time, coordinates, quality indicators, and computed ENU offsets.

Returns: DataFrame with columns: time, lat, lon, height, Q, ns, sdn, sde, sdu, age, ratio, E, N, U

Raises: FileNotFoundError: If .pos file doesn't exist

Examples: >>> analyzer = POSAnalyzer('rtk.pos') >>> df = analyzer.parse() >>> fix_epochs = df.filter(pl.col('Q') == 1) >>> print(f"Fix epochs: {len(fix_epochs)}")

Source code in pils/analyze/ppkdata/PPK/pos_analyzer.py
def parse(self) -> pl.DataFrame:
    """Parse .pos file into Polars DataFrame.

    Reads RTKLIB position solution file and converts to structured
    DataFrame with time, coordinates, quality indicators, and
    computed ENU offsets.

    Returns:
        DataFrame with columns: time, lat, lon, height, Q, ns,
        sdn, sde, sdu, age, ratio, E, N, U

    Raises:
        FileNotFoundError: If .pos file doesn't exist

    Examples:
        >>> analyzer = POSAnalyzer('rtk.pos')
        >>> df = analyzer.parse()
        >>> fix_epochs = df.filter(pl.col('Q') == 1)
        >>> print(f"Fix epochs: {len(fix_epochs)}")
    """
    if not Path(self.filepath).exists():
        raise FileNotFoundError(f"POS file not found: {self.filepath}")

    with open(self.filepath) as f:
        lines = f.readlines()

    # Extract header and data
    data_lines = [line for line in lines if not line.startswith("%")]

    # Simple parser for the standard RTKLIB POS format
    # Format: GPST, latitude(deg), longitude(deg), height(m), Q, ns, sdn(m), sde(m), sdu(m), sdne(m), sdeu(m), sdun(m), age(s), ratio

    records = []
    for line in data_lines:
        parts = line.split()
        if len(parts) >= 15:
            try:
                records.append(
                    {
                        "time": f"{parts[0]} {parts[1]}",
                        "lat": float(parts[2]),
                        "lon": float(parts[3]),
                        "height": float(parts[4]),
                        "Q": int(parts[5]),
                        "ns": int(parts[6]),
                        "sdn": float(parts[7]),
                        "sde": float(parts[8]),
                        "sdu": float(parts[9]),
                        "age": float(parts[13]),
                        "ratio": float(parts[14]),
                    }
                )
            except (ValueError, IndexError):
                continue

    if not records:
        return pl.DataFrame()

    self.df = pl.DataFrame(records).with_columns(
        [pl.col("time").str.to_datetime("%Y/%m/%d %H:%M:%S%.f")]
    )

    # Compute ENU Offsets if we have data
    if not self.df.is_empty():
        self._compute_enu()

    return self.df

get_statistics

get_statistics()

Calculate position statistics (fix rate, avg ratio, etc.).

Returns: Dictionary with processing quality metrics: - total_epochs: Total number of position solutions - fix_epochs: Number of fixed ambiguity solutions - float_epochs: Number of float ambiguity solutions - single_epochs: Number of single point solutions - fix_rate: Percentage of fixed solutions - avg_ratio: Average ambiguity ratio - max_ratio: Maximum ambiguity ratio

Examples: >>> analyzer = POSAnalyzer('solution.pos') >>> analyzer.parse() >>> stats = analyzer.get_statistics() >>> print(f"Fix rate: {stats['fix_rate']:.1f}%") >>> print(f"Average ratio: {stats['avg_ratio']:.2f}") >>> if stats['fix_rate'] < 95: ... print("Warning: Low fix rate detected")

Source code in pils/analyze/ppkdata/PPK/pos_analyzer.py
def get_statistics(self):
    """Calculate position statistics (fix rate, avg ratio, etc.).

    Returns:
        Dictionary with processing quality metrics:
        - total_epochs: Total number of position solutions
        - fix_epochs: Number of fixed ambiguity solutions
        - float_epochs: Number of float ambiguity solutions
        - single_epochs: Number of single point solutions
        - fix_rate: Percentage of fixed solutions
        - avg_ratio: Average ambiguity ratio
        - max_ratio: Maximum ambiguity ratio

    Examples:
        >>> analyzer = POSAnalyzer('solution.pos')
        >>> analyzer.parse()
        >>> stats = analyzer.get_statistics()
        >>> print(f"Fix rate: {stats['fix_rate']:.1f}%")
        >>> print(f"Average ratio: {stats['avg_ratio']:.2f}")
        >>> if stats['fix_rate'] < 95:
        ...     print("Warning: Low fix rate detected")
    """
    """
    Calculates position statistics (Fix rate, etc.)
    """
    if self.df.is_empty():
        return {}

    total_epochs = len(self.df)
    q_counts = self.df["Q"].value_counts()

    fix_count = (
        q_counts.filter(pl.col("Q") == 1)["count"].sum()
        if 1 in q_counts["Q"]
        else 0
    )
    float_count = (
        q_counts.filter(pl.col("Q") == 2)["count"].sum()
        if 2 in q_counts["Q"]
        else 0
    )
    single_count = (
        q_counts.filter(pl.col("Q") == 5)["count"].sum()
        if 5 in q_counts["Q"]
        else 0
    )

    return {
        "total_epochs": total_epochs,
        "fix_epochs": fix_count,
        "float_epochs": float_count,
        "single_epochs": single_count,
        "fix_rate": (fix_count / total_epochs) * 100 if total_epochs > 0 else 0,
        "avg_ratio": self.df["ratio"].mean(),
        "avg_ns": self.df["ns"].mean(),
        "max_ratio": self.df["ratio"].max(),
        "min_ratio": self.df["ratio"].min(),
    }

STATAnalyzer

STATAnalyzer(filepath: str | Path)

Polars-based analyzer for RTKLIB .stat residual files.

Parses RTKLIB processing statistics including phase/code residuals, SNR values, cycle slips, and rejection statistics.

Attributes:

Name Type Description
filepath str

Path to .pos.stat file

df DataFrame

Parsed statistics data

Examples:

>>> analyzer = STATAnalyzer('solution.pos.stat')
>>> df = analyzer.parse()
>>> sat_stats = analyzer.get_satellite_stats()
>>> print(sat_stats.head())

Initialize statistics analyzer.

Args: filepath: Path to RTKLIB .stat file

Source code in pils/analyze/ppkdata/PPK/stat_analyzer.py
def __init__(self, filepath: str | Path) -> None:
    """Initialize statistics analyzer.

    Args:
        filepath: Path to RTKLIB .stat file
    """
    self.filepath = filepath
    self.df = pl.DataFrame()

parse

parse() -> DataFrame

Parse $SAT lines from .stat file into Polars DataFrame.

Extracts satellite-level processing statistics including: - Phase and code residuals - SNR measurements - Cycle slip counts - Rejection counts - Lock status

Returns: DataFrame with columns: tow, satellite, frequency, azimuth, elevation, resid_code, resid_phase, snr, slip, lock, reject

Raises: FileNotFoundError: If .stat file doesn't exist

Examples: >>> analyzer = STATAnalyzer('rtk.pos.stat') >>> df = analyzer.parse() >>> gps_stats = df.filter(pl.col('satellite').str.starts_with('G')) >>> print(f"GPS observations: {len(gps_stats)}")

Source code in pils/analyze/ppkdata/PPK/stat_analyzer.py
def parse(self) -> pl.DataFrame:
    """Parse $SAT lines from .stat file into Polars DataFrame.

    Extracts satellite-level processing statistics including:
    - Phase and code residuals
    - SNR measurements
    - Cycle slip counts
    - Rejection counts
    - Lock status

    Returns:
        DataFrame with columns: tow, satellite, frequency, azimuth,
        elevation, resid_code, resid_phase, snr, slip, lock, reject

    Raises:
        FileNotFoundError: If .stat file doesn't exist

    Examples:
        >>> analyzer = STATAnalyzer('rtk.pos.stat')
        >>> df = analyzer.parse()
        >>> gps_stats = df.filter(pl.col('satellite').str.starts_with('G'))
        >>> print(f"GPS observations: {len(gps_stats)}")
    """
    if not Path(self.filepath).exists():
        raise FileNotFoundError(f"STAT file not found: {self.filepath}")

    # Use polars to read CSV-like $SAT lines
    # RTKLIB $SAT Format: 0:$SAT, 1:week, 2:tow, 3:sat, 4:freq, 5:az, 6:el, 7:resp, 8:resc, 9:vs, 10:snr, 11:fix, 12:slip, 13:lock, 14:outc, 15:slipc, 16:rejc

    sat_data = []
    with open(self.filepath) as f:
        for line in f:
            if line.startswith("$SAT"):
                parts = line.strip().split(",")
                try:
                    sat_data.append(
                        {
                            "tow": float(parts[2]),
                            "satellite": parts[3],
                            "frequency": int(parts[4]),
                            "azimuth": float(parts[5]),
                            "elevation": float(parts[6]),
                            "resid_code": float(parts[7]),
                            "resid_phase": float(parts[8]),
                            "snr": float(parts[10]),
                            "slip": int(parts[12]),
                            "lock": int(parts[13]),
                            "reject": int(parts[16]),
                        }
                    )
                except (ValueError, IndexError):
                    continue

    if not sat_data:
        return pl.DataFrame()

    self.df = pl.DataFrame(sat_data).with_columns(
        [pl.col("satellite").str.slice(0, 1).alias("constellation")]
    )

    return self.df

get_satellite_stats

get_satellite_stats()

Aggregates residuals and SNR per satellite.

Returns: DataFrame with per-satellite statistics including: - avg_snr: Mean signal-to-noise ratio - avg_resid_phase: Mean absolute phase residual - avg_resid_code: Mean absolute code residual - p95_resid_phase: 95th percentile phase residual - total_slips: Total cycle slips detected - total_rejects: Total rejected observations

Examples: >>> analyzer = STATAnalyzer('solution.pos.stat') >>> analyzer.parse() >>> sat_stats = analyzer.get_satellite_stats() >>> gps_stats = sat_stats.filter(pl.col('satellite').str.starts_with('G')) >>> print(f"GPS satellites: {gps_stats.height}") >>> high_residual = sat_stats.filter(pl.col('avg_resid_phase') > 0.05)

Source code in pils/analyze/ppkdata/PPK/stat_analyzer.py
def get_satellite_stats(self):
    """Aggregates residuals and SNR per satellite.

    Returns:
        DataFrame with per-satellite statistics including:
        - avg_snr: Mean signal-to-noise ratio
        - avg_resid_phase: Mean absolute phase residual
        - avg_resid_code: Mean absolute code residual
        - p95_resid_phase: 95th percentile phase residual
        - total_slips: Total cycle slips detected
        - total_rejects: Total rejected observations

    Examples:
        >>> analyzer = STATAnalyzer('solution.pos.stat')
        >>> analyzer.parse()
        >>> sat_stats = analyzer.get_satellite_stats()
        >>> gps_stats = sat_stats.filter(pl.col('satellite').str.starts_with('G'))
        >>> print(f"GPS satellites: {gps_stats.height}")
        >>> high_residual = sat_stats.filter(pl.col('avg_resid_phase') > 0.05)
    """
    """
    Aggregates residuals and SNR per satellite.
    """
    if self.df.is_empty():
        return pl.DataFrame()

    return (
        self.df.group_by(["satellite", "frequency"])
        .agg(
            [
                pl.col("snr").mean().alias("avg_snr"),
                pl.col("resid_phase").abs().mean().alias("avg_resid_phase"),
                pl.col("resid_code").abs().mean().alias("avg_resid_code"),
                pl.col("resid_phase").abs().quantile(0.95).alias("p95_resid_phase"),
                pl.col("slip").sum().alias("total_slips"),
                pl.col("reject").sum().alias("total_rejects"),
            ]
        )
        .sort(["satellite", "frequency"])
    )

get_global_stats

get_global_stats()

Aggregates residuals and SNR per frequency band.

Returns:

Type Description
DataFrame

DataFrame with global statistics by frequency: - mean_snr: Average SNR for the frequency - mean_resid_phase: Average phase residual - mean_resid_code: Average code residual

Examples:

>>> analyzer = STATAnalyzer('solution.pos.stat')
>>> analyzer.parse()
>>> global_stats = analyzer.get_global_stats()
>>> print(global_stats)
Source code in pils/analyze/ppkdata/PPK/stat_analyzer.py
def get_global_stats(self):
    """Aggregates residuals and SNR per frequency band.

    Returns
    -------
    pl.DataFrame
        DataFrame with global statistics by frequency:
        - mean_snr: Average SNR for the frequency
        - mean_resid_phase: Average phase residual
        - mean_resid_code: Average code residual

    Examples
    --------
    >>> analyzer = STATAnalyzer('solution.pos.stat')
    >>> analyzer.parse()
    >>> global_stats = analyzer.get_global_stats()
    >>> print(global_stats)
    """
    if self.df.is_empty():
        return pl.DataFrame()

    return (
        self.df.group_by("frequency")
        .agg(
            [
                pl.col("snr").mean().alias("mean_snr"),
                pl.col("resid_phase").abs().mean().alias("mean_resid_phase"),
                pl.col("resid_code").abs().mean().alias("mean_resid_code"),
            ]
        )
        .sort("frequency")
    )

RTKLIBReport

RTKLIBReport(pos_file: str | Path | None = None, stat_file: str | Path | None = None, pos_analyzer: POSAnalyzer | None = None, stat_analyzer: STATAnalyzer | None = None, plotter: PPKPlotter | None = None)

Generate comprehensive RTKLIB solution quality reports.

Orchestrates POSAnalyzer, STATAnalyzer, and PPKPlotter to produce detailed markdown reports analyzing RTK solution quality.

Attributes:

Name Type Description
pos POSAnalyzer

Analyzer for position solution data

stat STATAnalyzer

Analyzer for processing statistics

plotter PPKPlotter

Plotter for visualization

Examples:

>>> report = RTKLIBReport(pos_file='solution.pos', stat_file='solution.pos.stat')
>>> report.generate('ppk_report', plot_dir='plots')
>>> # Creates ppk_report.md with plots in plots/ subfolder

Initialize RTKLIB report generator.

Args: pos_file: Path to .pos file stat_file: Path to .pos.stat file pos_analyzer: Existing POSAnalyzer instance stat_analyzer: Existing STATAnalyzer instance plotter: Existing PPKPlotter instance

Examples: >>> # From files >>> report = RTKLIBReport(pos_file='solution.pos', stat_file='solution.pos.stat') >>> # From existing analyzers >>> pos = POSAnalyzer('solution.pos') >>> pos.parse() >>> report = RTKLIBReport(pos_analyzer=pos)

Source code in pils/analyze/ppkdata/PPK/report.py
def __init__(
    self,
    pos_file: str | Path | None = None,
    stat_file: str | Path | None = None,
    pos_analyzer: POSAnalyzer | None = None,
    stat_analyzer: STATAnalyzer | None = None,
    plotter: PPKPlotter | None = None,
) -> None:
    """Initialize RTKLIB report generator.

    Args:
        pos_file: Path to .pos file
        stat_file: Path to .pos.stat file
        pos_analyzer: Existing POSAnalyzer instance
        stat_analyzer: Existing STATAnalyzer instance
        plotter: Existing PPKPlotter instance

    Examples:
        >>> # From files
        >>> report = RTKLIBReport(pos_file='solution.pos', stat_file='solution.pos.stat')
        >>> # From existing analyzers
        >>> pos = POSAnalyzer('solution.pos')
        >>> pos.parse()
        >>> report = RTKLIBReport(pos_analyzer=pos)
    """
    if pos_analyzer is not None:
        self.pos = pos_analyzer
    else:
        if pos_file is not None:
            self.pos = POSAnalyzer(pos_file)
            self.pos.parse()  # Parse the .pos file
        else:
            logger.info(
                ".pos file or pos_analyzer is necessary to generate the report"
            )

    if stat_analyzer is not None:
        self.stat = stat_analyzer
    else:
        if stat_file is not None:
            self.stat = STATAnalyzer(stat_file)
            self.stat.parse()  # Parse the .stat file
        else:
            logger.info(
                ".stat file or stat_analyzer is necessary to generate the report"
            )

    if plotter is not None:
        self.plotter = plotter
    else:
        self.plotter = PPKPlotter(self.pos, self.stat)

generate

generate(output_dir: str = 'rtklib_quality_report', plot_dir: str = 'assets') -> str

Generate high-fidelity markdown report for RTKLIB outputs.

Args: output_dir: Directory for report output plot_dir: Subdirectory name for plot assets

Returns: Path to generated report file

Examples: >>> report = RTKLIBReport(pos_file='solution.pos', stat_file='solution.pos.stat') >>> report_path = report.generate('my_report', plot_dir='figures') >>> print(f"Report saved to: {report_path}")

Source code in pils/analyze/ppkdata/PPK/report.py
def generate(
    self, output_dir: str = "rtklib_quality_report", plot_dir: str = "assets"
) -> str:
    """Generate high-fidelity markdown report for RTKLIB outputs.

    Args:
        output_dir: Directory for report output
        plot_dir: Subdirectory name for plot assets

    Returns:
        Path to generated report file

    Examples:
        >>> report = RTKLIBReport(pos_file='solution.pos', stat_file='solution.pos.stat')
        >>> report_path = report.generate('my_report', plot_dir='figures')
        >>> print(f"Report saved to: {report_path}")
    """
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    assets_dir = output_path / plot_dir
    assets_dir.mkdir(parents=True, exist_ok=True)

    logger.info(f"Generating RTKLIB Quality Report in '{output_dir}'")

    report = "# RTKLIB Solution Quality Analysis\n\n"
    report += f"**Analysis Date:** {datetime.now():%Y-%m-%d %H:%M:%S}\n"
    report += "## Executive Solution Scoreboard\n"

    if self.plotter and self.stat:
        # Skyplot at the very beginning
        sky_path = assets_dir / "skyplot_rtklib.png"
        logger.debug("Generating RTKLIB Skyplot")
        self.plotter.plot_skyplot_snr(str(sky_path))
        if sky_path.exists():
            report += f"![Skyplot]({plot_dir}/skyplot_rtklib.png)\n\n"

    # 1. Solution Statistics (Fix Rate)
    if self.pos:
        stats = self.pos.get_statistics()

        fix_rate = stats.get("fix_rate", 0)
        status = (
            "EXCELLENT"
            if fix_rate > 95
            else "GOOD"
            if fix_rate > 80
            else "FAIR"
            if fix_rate > 50
            else "POOR"
        )

        report += f"### Fix Rate: **{fix_rate:.1f}%** ({status})\n\n"
        report += "#### Epoch Distribution\n"
        report += "| Status | Epochs | Percentage |\n"
        report += "|---|---|---|\n"
        report += f"| Fix (Q=1) | {stats['fix_epochs']} | {(stats['fix_epochs'] / stats['total_epochs'] * 100):.1f}% |\n"
        report += f"| Float (Q=2) | {stats['float_epochs']} | {(stats['float_epochs'] / stats['total_epochs'] * 100):.1f}% |\n"
        report += f"| Single (Q=5) | {stats['single_epochs']} | {(stats['single_epochs'] / stats['total_epochs'] * 100):.1f}% |\n\n"

        report += f"**Total Epochs:** {stats['total_epochs']} | **Avg Ratio:** {stats['avg_ratio']:.2f} | **Avg Sat Count:** {stats['avg_ns']:.1f}\n\n"

    # 2. ENU & Trajectory Dashboards
    if self.plotter and self.pos:
        # ENU Time Series
        enu_path = assets_dir / "enu_stability.png"
        logger.debug("Generating ENU Stability plot")
        self.plotter.plot_enu_time_series(str(enu_path))
        if enu_path.exists():
            report += "## Coordinate Stability (ENU)\n"
            report += f"![ENU]({plot_dir}/enu_stability.png)\n\n"

        # Trajectory
        traj_path = assets_dir / "trajectory.png"
        logger.debug("Building Trajectory Map")
        self.plotter.plot_trajectory_q(str(traj_path))
        if traj_path.exists():
            report += "## Solution Trajectory\n"
            report += f"![Trajectory]({plot_dir}/trajectory.png)\n\n"

        # Ratio
        ratio_path = assets_dir / "ratio_time.png"
        logger.debug("Generating Ratio stability plot")
        self.plotter.plot_ratio_time(str(ratio_path))
        if ratio_path.exists():
            report += "## AR Ratio Stability\n"
            report += f"![Ratio]({plot_dir}/ratio_time.png)\n\n"

    # 3. Residual & Signal Analysis (from .stat)
    if self.stat:
        sat_stats = self.stat.get_satellite_stats()
        global_stats = self.stat.get_global_stats()

        report += "## Signal & Residual Analysis\n"

        if self.plotter:
            snr_trend_path = assets_dir / "snr_stability.png"
            logger.debug("Generating SNR stability trend")
            self.plotter.plot_avg_snr_time_series(str(snr_trend_path))
            if snr_trend_path.exists():
                report += "### Signal Strength Stability (SNR)\n"
                report += f"![SNR Stability]({plot_dir}/snr_stability.png)\n\n"

        report += "### Global Per-Band Metrics\n"
        report += (
            "| Band | Mean SNR | Mean Phase Resid (m) | Mean Code Resid (m) |\n"
        )
        report += "|---|---|---|---|\n"
        for row in global_stats.iter_rows(named=True):
            report += f"| {row['frequency']} | {row['mean_snr']:.1f} | {row['mean_resid_phase']:.4f} | {row['mean_resid_code']:.3f} |\n"
        report += "\n"

        if self.plotter:
            resid_path = assets_dir / "residuals_multi.png"
            logger.debug("Generating Multi-Band residual distributions")
            self.plotter.plot_residual_distribution_dual(str(resid_path))
            if resid_path.exists():
                report += "### Localized Residual Distributions\n"
                report += f"![Residuals]({plot_dir}/residuals_multi.png)\n\n"

            snr_el_path = assets_dir / "snr_vs_el.png"
            self.plotter.plot_snr_vs_elevation(str(snr_el_path))
            if snr_el_path.exists():
                report += "### SNR vs Elevation\n"
                report += f"![SNR_EL]({plot_dir}/snr_vs_el.png)\n\n"

        # Constellation-Specific Residuals
        constellations = sorted(self.stat.df["constellation"].unique().to_list())
        report += "## Constellation-Specific Performance\n"
        for const in constellations:
            c_full_name = CONSTELLATION_NAMES.get(const, const)
            if c_full_name:
                c_full_name = c_full_name.upper()
            else:
                c_full_name = const.upper()
            report += f"### {c_full_name} Analysis\n"

            # SNR Time Series
            if self.plotter:
                snr_t_path = assets_dir / f"snr_ts_{const}.png"
                if hasattr(self.plotter, "plot_constellation_snr_time_series"):
                    self.plotter.plot_constellation_snr_time_series(
                        const, str(snr_t_path)
                    )
                if snr_t_path.exists():
                    report += f"#### {c_full_name} SNR Stability over Time\n![SNR]({plot_dir}/snr_ts_{const}.png)\n\n"

            # Histograms
            if self.plotter:
                h_path = assets_dir / f"resid_hist_{const}.png"
                if hasattr(self.plotter, "plot_stat_constellation_hists_dual"):
                    self.plotter.plot_stat_constellation_hists_dual(
                        const, str(h_path)
                    )
                if h_path.exists():
                    report += f"#### {c_full_name} Phase & Code Residuals\n![Hist]({plot_dir}/resid_hist_{const}.png)\n\n"

            # Bar Chart
            if self.plotter:
                b_path = assets_dir / f"resid_bar_{const}.png"
                if hasattr(self.plotter, "plot_sat_residual_bar"):
                    self.plotter.plot_sat_residual_bar(const, str(b_path))
                if b_path.exists():
                    report += f"#### {c_full_name} Per-Satellite Peak Residuals\n![Bar]({plot_dir}/resid_bar_{const}.png)\n\n"

        report += "## Satellite Quality Audit\n"
        report += "Analyzed satellites ranked by typical Carrier Phase stability (P95 Phase Residual).\n\n"

        # Top 10 Best
        report += "### Top 10 Best Performers (Lowest Error)\n"
        report += (
            "| Sat | Band | Mean SNR | P95 Phase Resid (m) | Slips | Rejects |\n"
        )
        report += "|---|---|---|---|---|---|\n"
        for row in (
            sat_stats.sort("p95_resid_phase", descending=False)
            .head(10)
            .iter_rows(named=True)
        ):
            report += f"| {row['satellite']} | {row['frequency']} | {row['avg_snr']:.1f} | {row['p95_resid_phase']:.4f} | {row['total_slips']} | {row['total_rejects']} |\n"
        report += "\n"

        # Top 10 Worst
        report += "### Top 10 Worst Performers (Highest Error)\n"
        report += (
            "| Sat | Band | Mean SNR | P95 Phase Resid (m) | Slips | Rejects |\n"
        )
        report += "|---|---|---|---|---|---|\n"
        for row in (
            sat_stats.sort("p95_resid_phase", descending=True)
            .head(10)
            .iter_rows(named=True)
        ):
            report += f"| {row['satellite']} | {row['frequency']} | {row['avg_snr']:.1f} | {row['p95_resid_phase']:.4f} | {row['total_slips']} | {row['total_rejects']} |\n"
        report += "\n"

    report_path = output_path / "report.md"
    report_path.write_text(report)

    logger.info(f"RTKLIB Quality Report generated: {report_path}")
    return str(report_path)

PPKPlotter

PPKPlotter(pos_analyzer=None, stat_analyzer=None)

Visualization engine for PPK analysis results.

Creates plots for PPK position solutions and processing statistics.

Attributes:

Name Type Description
pos POSAnalyzer

Position solution analyzer

stat STATAnalyzer

Processing statistics analyzer

Examples:

>>> pos_analyzer = POSAnalyzer('solution.pos')
>>> pos_analyzer.parse()
>>> stat_analyzer = STATAnalyzer('solution.pos.stat')
>>> stat_analyzer.parse()
>>> plotter = PPKPlotter(pos_analyzer, stat_analyzer)
>>> plotter.plot_trajectory_q('trajectory.png')

Initialize PPK plotter.

Args: pos_analyzer: POSAnalyzer instance with parsed position data stat_analyzer: STATAnalyzer instance with parsed statistics

Examples: >>> pos_analyzer = POSAnalyzer('solution.pos') >>> pos_analyzer.parse() >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer)

Source code in pils/analyze/ppkdata/PPK/plotter.py
def __init__(self, pos_analyzer=None, stat_analyzer=None):
    """Initialize PPK plotter.

    Args:
        pos_analyzer: POSAnalyzer instance with parsed position data
        stat_analyzer: STATAnalyzer instance with parsed statistics

    Examples:
        >>> pos_analyzer = POSAnalyzer('solution.pos')
        >>> pos_analyzer.parse()
        >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer)
    """
    self.pos = pos_analyzer
    self.stat = stat_analyzer

plot_skyplot_snr

plot_skyplot_snr(save_path=None)

Polar skyplot with SNR tracks.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_skyplot_snr('skyplot.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_skyplot_snr(self, save_path=None):
    """Polar skyplot with SNR tracks.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_skyplot_snr('skyplot.png')
    """
    if not self.stat or self.stat.df.is_empty():
        return

    df = self.stat.df
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111, projection="polar")
    fig.patch.set_alpha(0)
    # Set polar plot properties with try/except for compatibility
    try:
        ax.set_theta_zero_location("N")  # type: ignore
    except AttributeError:
        pass
    try:
        ax.set_theta_direction(-1)  # type: ignore
    except AttributeError:
        pass
    try:
        ax.set_rlim(90, 0)  # type: ignore
    except AttributeError:
        pass

    # Plot all points
    sc = ax.scatter(
        np.deg2rad(df["azimuth"]),
        90 - df["elevation"],
        c=df["snr"],
        cmap="viridis",
        s=5,
        alpha=0.5,
    )
    plt.colorbar(sc, ax=ax, label="SNR (dB-Hz)")

    # Add labels for each satellite
    for sat in df["satellite"].unique().to_list():
        sub = df.filter(pl.col("satellite") == sat)
        if not sub.is_empty():
            ax.text(
                np.deg2rad(sub["azimuth"].mean()),
                90 - sub["elevation"].mean(),
                sat,
                fontsize=8,
                fontweight="bold",
                ha="center",
                bbox=dict(facecolor="white", alpha=0.5, boxstyle="round,pad=0.2"),
            )

    ax.set_title(
        "Skyplot: Satellite Tracks (Colored by SNR)",
        fontweight="bold",
        fontsize=14,
        pad=30,
    )
    GNSSColors.apply_theme(ax)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_trajectory_q

plot_trajectory_q(save_path=None)

Plots trajectory (Lon vs Lat) colored by Q status.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer) >>> plotter.plot_trajectory_q('trajectory.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_trajectory_q(self, save_path=None):
    """Plots trajectory (Lon vs Lat) colored by Q status.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer)
        >>> plotter.plot_trajectory_q('trajectory.png')
    """
    if not self.pos or self.pos.df.is_empty():
        return

    df = self.pos.df
    q_colors = {1: GNSSColors.FIX, 2: GNSSColors.FLOAT, 5: GNSSColors.SINGLE}
    q_labels = {1: "Fix", 2: "Float", 5: "Single"}

    fig, ax = plt.subplots(figsize=(10, 8))
    fig.patch.set_alpha(0)

    for q in sorted(df["Q"].unique().to_list()):
        sub = df.filter(pl.col("Q") == q)
        ax.scatter(
            sub["lon"],
            sub["lat"],
            c=q_colors.get(q, "black"),
            label=q_labels.get(q, f"Q={q}"),
            s=15,
            alpha=0.7,
        )

    ax.set_title("Trajectory Map (Colored by Solution Quality)", fontweight="bold")
    ax.set_xlabel("Longitude (deg)")
    ax.set_ylabel("Latitude (deg)")
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_ratio_time

plot_ratio_time(save_path=None)

Plot ambiguity ratio over time.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer) >>> plotter.plot_ratio_time('ratio_time.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_ratio_time(self, save_path=None):
    """Plot ambiguity ratio over time.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer)
        >>> plotter.plot_ratio_time('ratio_time.png')
    """
    """Plots AR Ratio over time."""
    if not self.pos or self.pos.df.is_empty():
        return

    df = self.pos.df
    fig, ax = plt.subplots(figsize=(14, 5))
    fig.patch.set_alpha(0)

    ax.plot(
        df["time"].to_numpy(), df["ratio"].to_numpy(), color="purple", linewidth=1.5
    )
    ax.axhline(3.0, color="red", linestyle="--", label="Fix Threshold (3.0)")

    ax.set_title("AR Ratio vs Time", fontweight="bold")
    ax.set_ylabel("Ratio")
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_snr_vs_elevation

plot_snr_vs_elevation(save_path=None)

Scatter plot of SNR vs elevation angle.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_snr_vs_elevation('snr_elevation.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_snr_vs_elevation(self, save_path=None):
    """Scatter plot of SNR vs elevation angle.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_snr_vs_elevation('snr_elevation.png')
    """
    """Plots SNR vs Elevation from .stat file, with band legends."""
    if not self.stat or self.stat.df.is_empty():
        return

    df = self.stat.df
    fig, ax = plt.subplots(figsize=(10, 6))
    fig.patch.set_alpha(0)

    bands = sorted(df["frequency"].unique().to_list())
    colors = [
        GNSSColors.BAND_PRIMARY,
        GNSSColors.BAND_SECONDARY,
        GNSSColors.BAND_TERTIARY,
    ]

    for i, b in enumerate(bands):
        sub = df.filter(pl.col("frequency") == b)
        ax.scatter(
            sub["elevation"].to_numpy(),
            sub["snr"].to_numpy(),
            color=colors[i % len(colors)],
            s=10,
            alpha=0.3,
            label=f"Band {b}",
        )

    ax.set_title("Signal Strength (SNR) vs Elevation", fontweight="bold")
    ax.set_xlabel("Elevation (deg)")
    ax.set_ylabel("SNR (dB-Hz)")
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_enu_time_series

plot_enu_time_series(save_path=None)

Plot East-North-Up offsets over time.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer) >>> plotter.plot_enu_time_series('enu_timeseries.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_enu_time_series(self, save_path=None):
    """Plot East-North-Up offsets over time.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(pos_analyzer=pos_analyzer)
        >>> plotter.plot_enu_time_series('enu_timeseries.png')
    """
    """Plots East, North, Up error time series colored by Q status."""
    if not self.pos or self.pos.df.is_empty():
        return
    if "east" not in self.pos.df.columns:
        return

    df = self.pos.df
    q_colors = {1: GNSSColors.FIX, 2: GNSSColors.FLOAT, 5: GNSSColors.SINGLE}
    q_labels = {1: "Fix", 2: "Float", 5: "Single"}

    fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
    fig.patch.set_alpha(0)

    cols = ["east", "north", "up"]
    labels = ["East (m)", "North (m)", "Up (m)"]

    for i, (col, label) in enumerate(zip(cols, labels, strict=False)):
        axes[i].plot(
            df["time"].to_numpy(),
            df[col].to_numpy(),
            color="black",
            linewidth=0.5,
            alpha=0.2,
        )
        for q in sorted(df["Q"].unique().to_list()):
            sub = df.filter(pl.col("Q") == q)
            axes[i].scatter(
                sub["time"].to_numpy(),
                sub[col].to_numpy(),
                c=q_colors.get(q, "gray"),
                s=2,
                label=q_labels.get(q, f"Q={q}") if i == 0 else "",
            )
        axes[i].set_ylabel(label, fontweight="bold")
        GNSSColors.apply_theme(axes[i])

    axes[0].set_title(
        "ENU Position Deviations Over Time (Colored by Fix Q)",
        fontweight="bold",
        fontsize=14,
    )
    axes[2].set_xlabel("Time", fontweight="bold")
    axes[2].xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
    if 1 in df["Q"]:
        axes[0].legend(loc="upper right")

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_residual_distribution_dual

plot_residual_distribution_dual(save_path=None)

Plot histogram of phase residuals for L1/L2.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_residual_distribution_dual('residuals.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_residual_distribution_dual(self, save_path=None):
    """Plot histogram of phase residuals for L1/L2.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_residual_distribution_dual('residuals.png')
    """
    """Overimposed histograms for Phase and Pseudorange residuals split by band."""
    if not self.stat or self.stat.df.is_empty():
        return

    df = self.stat.df
    bands = sorted(df["frequency"].unique().to_list())
    colors = [GNSSColors.BAND_PRIMARY, GNSSColors.BAND_SECONDARY]

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))
    fig.patch.set_alpha(0)

    for i, b in enumerate(bands):
        sub = df.filter(pl.col("frequency") == b)
        ax1.hist(
            sub["resid_phase"].to_numpy(),
            bins=100,
            range=(-0.1, 0.1),
            color=colors[i % 2],
            alpha=0.5,
            label=f"Band {b}",
            edgecolor="black",
        )
        ax2.hist(
            sub["resid_code"].to_numpy(),
            bins=100,
            range=(-5, 5),
            color=colors[i % 2],
            alpha=0.5,
            label=f"Band {b}",
            edgecolor="black",
        )

    ax1.set_title("Carrier Phase Residual Distribution", fontweight="bold")
    ax1.set_xlabel("Residual (m)")
    ax1.legend()
    GNSSColors.apply_theme(ax1)

    ax2.set_title("Pseudorange Residual Distribution", fontweight="bold")
    ax2.set_xlabel("Residual (m)")
    ax2.legend()
    GNSSColors.apply_theme(ax2)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_avg_snr_time_series

plot_avg_snr_time_series(save_path=None)

Plot average SNR per frequency over time.

Args: save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_avg_snr_time_series('avg_snr_time.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_avg_snr_time_series(self, save_path=None):
    """Plot average SNR per frequency over time.

    Args:
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_avg_snr_time_series('avg_snr_time.png')
    """
    """Average SNR per band as a function of time with STD shading, colored by satellite count."""
    if not self.stat or self.stat.df.is_empty():
        return

    agg = (
        self.stat.df.group_by(["tow", "frequency"])
        .agg(
            [
                pl.col("snr").mean().alias("avg_snr"),
                pl.col("snr").std().alias("std_snr"),
                pl.col("satellite").count().alias("n_sats"),
            ]
        )
        .sort(["tow", "frequency"])
    )

    if agg.is_empty():
        return

    bands = sorted(agg["frequency"].unique().to_list())
    fig, ax = plt.subplots(figsize=(14, 7))
    fig.patch.set_alpha(0)

    for i, b in enumerate(bands):
        sub = agg.filter(pl.col("frequency") == b)
        t = sub["tow"].to_numpy()
        y = sub["avg_snr"].to_numpy()
        s = sub["std_snr"].fill_null(0).to_numpy()

        line_color = (
            GNSSColors.BAND_PRIMARY if i == 0 else GNSSColors.BAND_SECONDARY
        )
        ax.plot(t, y, color=line_color, label=f"Band {b} Mean", linewidth=2)
        # Increased alpha and better contrast for shading
        ax.fill_between(t, y - s, y + s, color=line_color, alpha=0.2)

        # Simple scatter for points, no colorbar as requested
        ax.scatter(t, y, color=line_color, s=15, alpha=0.6)

    ax.set_title(
        "Average Signal Strength (SNR) Stability Over Time",
        fontweight="bold",
        fontsize=14,
    )
    ax.set_xlabel("Time (TOW)")
    ax.set_ylabel("SNR (dB-Hz)")
    ax.legend()
    GNSSColors.apply_theme(ax)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_stat_constellation_hists_dual

plot_stat_constellation_hists_dual(const, save_path=None)

Plot dual-panel SNR histograms for constellation.

Args: const: Constellation code ('G', 'R', 'E', 'C') save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_stat_constellation_hists_dual('G', 'gps_histograms.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_stat_constellation_hists_dual(self, const, save_path=None):
    """Plot dual-panel SNR histograms for constellation.

    Args:
        const: Constellation code ('G', 'R', 'E', 'C')
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_stat_constellation_hists_dual('G', 'gps_histograms.png')
    """
    """Carrier Phase and Pseudorange residuals split by band for a constellation."""
    if not self.stat or self.stat.df.is_empty():
        return
    df = self.stat.df.filter(pl.col("constellation") == const)
    if df.is_empty():
        return

    bands = sorted(df["frequency"].unique().to_list())
    colors = [GNSSColors.BAND_PRIMARY, GNSSColors.BAND_SECONDARY]

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    fig.patch.set_alpha(0)

    for i, b in enumerate(bands):
        sub = df.filter(pl.col("frequency") == b)
        ax1.hist(
            sub["resid_phase"].to_numpy(),
            bins=50,
            range=(-0.1, 0.1),
            color=colors[i % 2],
            alpha=0.5,
            label=f"Band {b}",
            edgecolor="black",
        )
        ax2.hist(
            sub["resid_code"].to_numpy(),
            bins=50,
            range=(-5, 5),
            color=colors[i % 2],
            alpha=0.5,
            label=f"Band {b}",
            edgecolor="black",
        )

    ax1.set_title(f"Phase Residuals: {const}", fontweight="bold")
    ax1.set_xlabel("Residual (m)")
    ax1.legend()
    GNSSColors.apply_theme(ax1)

    ax2.set_title(f"Code Residuals: {const}", fontweight="bold")
    ax2.set_xlabel("Residual (m)")
    ax2.legend()
    GNSSColors.apply_theme(ax2)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_constellation_snr_time_series

plot_constellation_snr_time_series(const, save_path=None)

Plot SNR time series for specific constellation.

Args: const: Constellation code save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_constellation_snr_time_series('E', 'galileo_snr_time.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_constellation_snr_time_series(self, const, save_path=None):
    """Plot SNR time series for specific constellation.

    Args:
        const: Constellation code
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_constellation_snr_time_series('E', 'galileo_snr_time.png')
    """
    """Split SNR vs Time into subplots per band, showing all satellites for that constellation."""
    if not self.stat or self.stat.df.is_empty():
        return
    df = self.stat.df.filter(pl.col("constellation") == const)
    if df.is_empty():
        return

    bands = sorted(df["frequency"].unique().to_list())
    fig, axes = plt.subplots(
        len(bands), 1, figsize=(14, 5 * len(bands)), sharex=True
    )
    if len(bands) == 1:
        axes = [axes]
    fig.patch.set_alpha(0)

    for i, b in enumerate(bands):
        sub_b = df.filter(pl.col("frequency") == b)
        sats = sorted(sub_b["satellite"].unique().to_list())

        for sat in sats:
            sat_data = sub_b.filter(pl.col("satellite") == sat).sort("tow")
            axes[i].plot(
                sat_data["tow"].to_numpy(),
                sat_data["snr"].to_numpy(),
                label=sat,
                alpha=0.7,
            )

        axes[i].set_title(f"SNR stability - Band {b}", fontweight="bold")
        axes[i].set_ylabel("SNR (dB-Hz)")
        axes[i].legend(
            bbox_to_anchor=(1.01, 1), loc="upper left", fontsize="x-small", ncol=2
        )
        GNSSColors.apply_theme(axes[i])

    axes[-1].set_xlabel("Time (TOW)")
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()

plot_sat_residual_bar

plot_sat_residual_bar(const, save_path=None)

Plot bar chart of residuals per satellite.

Args: const: Constellation code save_path: Output file path

Examples: >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer) >>> plotter.plot_sat_residual_bar('C', 'beidou_residuals.png')

Source code in pils/analyze/ppkdata/PPK/plotter.py
def plot_sat_residual_bar(self, const, save_path=None):
    """Plot bar chart of residuals per satellite.

    Args:
        const: Constellation code
        save_path: Output file path

    Examples:
        >>> plotter = PPKPlotter(stat_analyzer=stat_analyzer)
        >>> plotter.plot_sat_residual_bar('C', 'beidou_residuals.png')
    """
    """Bar chart of P95 phase residuals per satellite."""
    if not self.stat or self.stat.df.is_empty():
        return
    stats = self.stat.get_satellite_stats().filter(
        pl.col("satellite").str.starts_with(const)
    )
    if stats.is_empty():
        return

    sats = sorted(stats["satellite"].unique().to_list())
    bands = sorted(stats["frequency"].unique().to_list())

    fig, ax = plt.subplots(figsize=(14, 6))
    fig.patch.set_alpha(0)

    x = np.arange(len(sats))
    width = 0.8 / len(bands)

    for i, band in enumerate(bands):
        sub = stats.filter(pl.col("frequency") == band)
        vals = []
        for s in sats:
            match = sub.filter(pl.col("satellite") == s)
            vals.append(
                match["p95_resid_phase"].sum() if not match.is_empty() else 0
            )

        ax.bar(
            x + i * width - 0.4 + width / 2,
            vals,
            width,
            color=[GNSSColors.BAND_PRIMARY, GNSSColors.BAND_SECONDARY][i % 2],
            label=f"Band {band}",
            alpha=0.8,
        )

    ax.set_xticks(x)
    ax.set_xticklabels(sats, rotation=45)
    ax.set_ylabel("P95 Phase Residual (m)", fontweight="bold")
    ax.set_title(f"Satellite Peak Phase Errors: {const}", fontweight="bold")
    ax.axhline(0.05, color="red", ls="--", alpha=0.5, label="Target (5cm)")
    ax.legend()
    GNSSColors.apply_theme(ax)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, transparent=True)
        plt.close()
    else:
        plt.show()