Skip to content

Flight

Flight

Flight(flight_info: dict[str, Any])

This class provides a hierarchical structure to store and access drone flight data and sensor payloads. Data is stored in RAM for fast access using both attribute and dictionary-style notation.

Attributes:

Name Type Description
flight_info Dict

Dictionary containing flight configuration paths

flight_path Path

Path to the flight directory

metadata Dict

Flight metadata (duration, date, conditions, etc.)

raw_data RawData

Container for drone and payload sensor data

sync_data Optional[dict[str, DataFrame]]

Synchronized flight data (populated after calling sync())

adc_gain_config Optional

Configuration for ADC gain settings

Examples:

>>> # Create a flight instance
>>> flight_info = {
...     "drone_data_folder_path": "/data/flight_001/drone",
...     "aux_data_folder_path": "/data/flight_001/aux"
... }
>>> flight = Flight(flight_info)
>>> # Add metadata
>>> flight.set_metadata({
...     'flight_time': '2025-01-28 14:30:00',
...     'duration': 1800,
...     'weather': 'clear'
... })
>>> # Load drone data (auto-detects DJI or BlackSquare)
>>> flight.add_drone_data(dji_drone_loader='dat')
>>> # Load sensor data
>>> flight.add_sensor_data(['gps', 'imu', 'adc'])
>>> # Load camera data (Sony or Alvium)
>>> flight.add_camera_data(use_photogrammetry=False, get_sony_angles=True)
>>> # Access data using attributes
>>> drone_df = flight.raw_data.drone_data.drone
>>> gps_df = flight.raw_data.payload_data.gps
>>> camera_df = flight.raw_data.payload_data.camera
>>> # Or use dictionary-style access (same speed!)
>>> drone_df = flight['raw_data']['drone_data']['drone']
>>> gps_df = flight['raw_data']['payload']['gps']
>>> camera_df = flight['raw_data']['payload']['camera']
>>> # Synchronize all data sources
>>> sync_df = flight.sync(target_rate={'drone': 10.0, 'payload': 100.0})
>>> # Perform operations on the data
>>> high_altitude = drone_df.filter(pl.col('altitude') > 100)
>>> print(f"Points above 100m: {len(high_altitude)}")

Initialize a Flight data container.

Parameters:

Name Type Description Default
flight_info Dict

Dictionary containing at minimum: - 'drone_data_folder_path': Path to drone data folder - 'aux_data_folder_path': Path to auxiliary sensor data folder

required

Examples:

>>> flight_info = {
...     "drone_data_folder_path": "/mnt/data/flight_001/drone",
...     "aux_data_folder_path": "/mnt/data/flight_001/aux"
... }
>>> flight = Flight(flight_info)
Source code in pils/flight.py
def __init__(self, flight_info: dict[str, Any]):
    """
    Initialize a Flight data container.

    Parameters
    ----------
    flight_info : Dict
        Dictionary containing at minimum:
        - 'drone_data_folder_path': Path to drone data folder
        - 'aux_data_folder_path': Path to auxiliary sensor data folder

    Examples
    --------
    >>> flight_info = {
    ...     "drone_data_folder_path": "/mnt/data/flight_001/drone",
    ...     "aux_data_folder_path": "/mnt/data/flight_001/aux"
    ... }
    >>> flight = Flight(flight_info)
    """
    self.flight_info = flight_info
    self.flight_path = Path(flight_info["drone_data_folder_path"]).parent
    self.metadata = {}
    self.set_metadata()

    self.raw_data = RawData()
    self.sync_data: dict[str, pl.DataFrame] | None = None
    self.adc_gain_config = None

from_hdf5 classmethod

from_hdf5(filepath: str | Path, sync_version: str | None | bool = None, load_raw: bool = True) -> Flight

Load flight data from HDF5 file.

Loads metadata and raw_data hierarchy. Optionally loads a specific synchronized data version or the latest available version.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to HDF5 file

required
sync_version Union[str, None, bool]

Specific sync version to load (e.g., 'rev_20260202_1430'). If None and synchronized data exists, loads latest version. Set to False to skip loading synchronized data.

None
load_raw bool

If True, loads raw_data. If False, only loads metadata and sync data.

True

Returns:

Type Description
Flight

Returns new Flight instance

Raises:

Type Description
ImportError

If h5py is not installed

FileNotFoundError

If HDF5 file doesn't exist

ValueError

If requested sync version not found

Examples:

>>> # Load from file
>>> flight = Flight.from_hdf5('flight_001.h5')
>>> # Load specific sync version
>>> flight = Flight.from_hdf5('flight_001.h5', sync_version='rev_20260202_1430')
>>> # Load only metadata and raw data
>>> flight = Flight.from_hdf5('flight_001.h5', sync_version=False)
Source code in pils/flight.py
@classmethod
def from_hdf5(
    cls,
    filepath: str | Path,
    sync_version: str | None | bool = None,
    load_raw: bool = True,
) -> "Flight":
    """
    Load flight data from HDF5 file.

    Loads metadata and raw_data hierarchy. Optionally loads a specific
    synchronized data version or the latest available version.

    Parameters
    ----------
    filepath : Union[str, Path]
        Path to HDF5 file
    sync_version : Union[str, None, bool], default=None
        Specific sync version to load (e.g., 'rev_20260202_1430').
        If None and synchronized data exists, loads latest version.
        Set to False to skip loading synchronized data.
    load_raw : bool, default=True
        If True, loads raw_data. If False, only loads metadata and sync data.

    Returns
    -------
    Flight
        Returns new Flight instance

    Raises
    ------
    ImportError
        If h5py is not installed
    FileNotFoundError
        If HDF5 file doesn't exist
    ValueError
        If requested sync version not found

    Examples
    --------
    >>> # Load from file
    >>> flight = Flight.from_hdf5('flight_001.h5')
    >>> # Load specific sync version
    >>> flight = Flight.from_hdf5('flight_001.h5', sync_version='rev_20260202_1430')
    >>> # Load only metadata and raw data
    >>> flight = Flight.from_hdf5('flight_001.h5', sync_version=False)
    """

    filepath = Path(filepath)

    if not filepath.exists():
        raise FileNotFoundError(f"HDF5 file not found: {filepath}")

    with h5py.File(str(filepath), "r") as f:
        # Load metadata
        metadata_dict = {}
        flight_info_dict = {}
        if "metadata" in f:
            metadata_group = f["metadata"]
            assert isinstance(metadata_group, h5py.Group)
            for key in metadata_group.attrs:
                if key.startswith("flight_info_"):
                    # Strip prefix and add to flight_info
                    clean_key = key.replace("flight_info_", "", 1)
                    flight_info_dict[clean_key] = metadata_group.attrs[key]
                else:
                    metadata_dict[key] = metadata_group.attrs[key]

        # Create new instance
        flight = cls(
            flight_info=flight_info_dict if flight_info_dict else metadata_dict
        )

        # Load metadata
        if "metadata" in f:
            metadata_group = f["metadata"]
            assert isinstance(metadata_group, h5py.Group)
            flight._load_metadata_from_hdf5(metadata_group, flight)

        # Load raw data
        if load_raw and "raw_data" in f:
            raw_data_group = f["raw_data"]
            assert isinstance(raw_data_group, h5py.Group)
            flight._load_raw_data_from_hdf5(raw_data_group, flight)

        # Load sync_data if available
        if sync_version is not False and "sync_data" in f:
            sync_data_group = f["sync_data"]
            assert isinstance(sync_data_group, h5py.Group)
            available_versions = sorted(
                [k for k in sync_data_group.keys() if k.startswith("rev_")]
            )
            if available_versions:
                # Determine which version to load
                if sync_version is None:
                    # Load latest version
                    sync_version = available_versions[-1]
                elif sync_version not in available_versions:
                    raise ValueError(
                        f"Sync version '{sync_version}' not found. "
                        f"Available versions: {available_versions}"
                    )

                revision_group = sync_data_group[sync_version]
                assert isinstance(revision_group, h5py.Group)
                # Load all datasets in the revision
                sync_dict = {}
                for key in revision_group.keys():
                    dataset_group = revision_group[key]
                    assert isinstance(dataset_group, h5py.Group)
                    df = flight._load_dataframe_from_hdf5(dataset_group)
                    if df is not None:
                        sync_dict[key] = df
                if sync_dict:
                    flight.sync_data = sync_dict

    return flight

set_metadata

set_metadata(metadata: dict[str, Any] | None = None) -> None

Set flight metadata.

Parameters:

Name Type Description Default
metadata Dict[str, Any]

Dictionary containing metadata fields such as flight_time, duration, weather conditions, pilot info, etc.

None

Examples:

>>> flight.set_metadata({
...     'flight_time': '2025-01-28 14:30:00',
...     'duration': 1800,
...     'pilot': 'John Doe',
...     'weather': 'clear',
...     'temperature': 22.5
... })
>>> print(flight.metadata['flight_time'])
'2025-01-28 14:30:00'
Source code in pils/flight.py
def set_metadata(self, metadata: dict[str, Any] | None = None) -> None:
    """
    Set flight metadata.

    Parameters
    ----------
    metadata : Dict[str, Any], optional
        Dictionary containing metadata fields
        such as flight_time, duration, weather conditions, pilot info, etc.

    Examples
    --------
    >>> flight.set_metadata({
    ...     'flight_time': '2025-01-28 14:30:00',
    ...     'duration': 1800,
    ...     'pilot': 'John Doe',
    ...     'weather': 'clear',
    ...     'temperature': 22.5
    ... })
    >>> print(flight.metadata['flight_time'])
    '2025-01-28 14:30:00'
    """
    # First, store all provided metadata fields
    if isinstance(metadata, dict):
        self.metadata.update(metadata)

    # Then extract time-related fields for special processing
    info_source: dict[str, Any] = {}
    if isinstance(metadata, dict):
        info_source = metadata
    elif isinstance(self.flight_info, dict):
        info_source = self.flight_info

    # Support both `takeoff_time`/`landing_time` and `takeoff_datetime`/`landing_datetime`
    takeoff = info_source.get("takeoff_time") or info_source.get("takeoff_datetime")
    landing = info_source.get("landing_time") or info_source.get("landing_datetime")

    if takeoff is not None and landing is not None:
        try:
            self.metadata["takeoff_time"] = takeoff
            # If subtraction works, store duration; otherwise keep raw landing
            self.metadata["flight_time"] = landing - takeoff
        except Exception:
            self.metadata["takeoff_time"] = takeoff
            self.metadata["landing_time"] = landing

    # Optional flight name (only if not already set)
    if "flight_name" in info_source and "flight_name" not in self.metadata:
        self.metadata["flight_name"] = info_source.get("flight_name")

add_drone_data

add_drone_data(dji_dat_loader: bool = True, drone_model: str | None = None)

Load drone telemetry data based on auto-detected drone model.

Automatically detects whether the drone is DJI or BlackSquare and loads the appropriate data format. For DJI drones, also loads Litchi flight logs if available.

Parameters:

Name Type Description Default
dji_dat_loader bool

If True, uses .DAT format for DJI drones. If False, uses .CSV format.

True
drone_model Optional[str]

Drone model to load. If None, will auto-detect.

None

Returns:

Type Description
DroneData

Reference to the loaded drone data

Raises:

Type Description
ValueError

If an unknown drone model is detected

Examples:

>>> # Load DJI drone data using .DAT files (default)
>>> flight.add_drone_data(dji_dat_loader=True)
>>> # Load DJI drone data using .CSV files
>>> flight.add_drone_data(dji_dat_loader=False)
>>> # Access drone telemetry
>>> print(flight.raw_data.drone_data.drone.head())
>>> # Access Litchi waypoint data (if DJI)
>>> if flight.raw_data.drone_data.litchi is not None:
...     print(flight.raw_data.drone_data.litchi.head())
>>> # Alternative: use dictionary access
>>> drone_data = flight['raw_data']['drone_data']['drone']
Source code in pils/flight.py
def add_drone_data(
    self,
    dji_dat_loader: bool = True,
    drone_model: str | None = None,
):
    """
    Load drone telemetry data based on auto-detected drone model.

    Automatically detects whether the drone is DJI or BlackSquare and loads
    the appropriate data format. For DJI drones, also loads Litchi flight logs
    if available.

    Parameters
    ----------
    dji_dat_loader : bool, default=True
        If True, uses .DAT format for DJI drones.
        If False, uses .CSV format.
    drone_model : Optional[str], default=None
        Drone model to load. If None, will auto-detect.

    Returns
    -------
    DroneData
        Reference to the loaded drone data

    Raises
    ------
    ValueError
        If an unknown drone model is detected

    Examples
    --------
    >>> # Load DJI drone data using .DAT files (default)
    >>> flight.add_drone_data(dji_dat_loader=True)
    >>> # Load DJI drone data using .CSV files
    >>> flight.add_drone_data(dji_dat_loader=False)
    >>> # Access drone telemetry
    >>> print(flight.raw_data.drone_data.drone.head())
    >>> # Access Litchi waypoint data (if DJI)
    >>> if flight.raw_data.drone_data.litchi is not None:
    ...     print(flight.raw_data.drone_data.litchi.head())
    >>> # Alternative: use dictionary access
    >>> drone_data = flight['raw_data']['drone_data']['drone']
    """

    # Resolve drone folder
    if not isinstance(self.flight_info, dict):
        raise ValueError(
            "flight_info must be a dict containing 'drone_data_folder_path'"
        )

    drone_folder = self.flight_info.get("drone_data_folder_path")
    if not drone_folder:
        raise ValueError("drone_data_folder_path not found in flight_info")

    if not drone_model:
        drone_model = self._detect_drone_model(str(drone_folder))

    self.__drone_model = drone_model

    # Find candidate files
    available_files = glob.glob(str(drone_folder) + "/*")

    if isinstance(self.__drone_model, str) and "dji" in self.__drone_model.lower():
        drone_data_path = None
        litchi_data_path = None

        for file in available_files:
            fname = file.lower()

            if dji_dat_loader and fname.endswith("drone.dat"):
                drone_data_path = file

            else:
                if fname.endswith("drone.csv") and drone_data_path is None:
                    drone_data_path = file

            if fname.endswith("litchi.csv") and "dji" in self.__drone_model.lower():
                litchi_data_path = file

        logger.info(f"Drone : {drone_data_path}")
        drone = DJIDrone(drone_data_path)
        drone.load_data(use_dat=dji_dat_loader)
        drone_data = drone.data

        litchi_data = None

        if litchi_data_path is not None:
            litchi_loader = Litchi(litchi_data_path)
            litchi_loader.load_data()
            litchi_data = litchi_loader.data

    elif isinstance(self.__drone_model, str) and (
        "black" in self.__drone_model.lower()
        or "blacksquare" in self.__drone_model.lower()
    ):
        drone = BlackSquareDrone(drone_folder)
        drone.load_data()
        drone_data = drone.data
        litchi_data = None

    else:
        try:
            drone = DJIDrone(drone_data_path or str(drone_folder))
            drone.load_data(use_dat=dji_dat_loader)
            drone_data = drone.data
            litchi_loader = Litchi(litchi_data_path or str(drone_folder))
            litchi_loader.load_data()
            litchi_data = litchi_loader.data
        except Exception:
            drone = BlackSquareDrone(str(drone_folder))
            drone.load_data()
            drone_data = drone.data
            litchi_data = None

    self.raw_data.drone_data = DroneData(drone_data, litchi_data)

add_sensor_data

add_sensor_data(sensor_name: str | list[str]) -> None

Load sensor data from the payload.

Loads one or more sensors from the auxiliary data folder. Sensors are automatically detected and loaded based on their type.

Parameters:

Name Type Description Default
sensor_name Union[str, List[str]]

Single sensor name or list of sensor names. Supported sensors: 'gps', 'imu', 'adc', 'inclinometer'

required

Examples:

>>> # Load a single sensor
>>> flight.add_sensor_data('gps')
>>> print(flight.raw_data.payload_data.gps)
>>> # Load multiple sensors at once
>>> flight.add_sensor_data(['gps', 'imu', 'adc'])
>>> # Access sensor data
>>> gps_data = flight.raw_data.payload_data.gps
>>> imu_data = flight.raw_data.payload_data.imu
>>> # Or use dictionary-style
>>> gps_data = flight['raw_data']['payload']['gps']
>>> # Filter GPS data
>>> high_accuracy = gps_data.filter(pl.col('accuracy') < 5.0)
>>> # List all loaded sensors
>>> print(flight.raw_data.payload_data.list_loaded_sensors())
Source code in pils/flight.py
def add_sensor_data(self, sensor_name: str | list[str]) -> None:
    """
    Load sensor data from the payload.

    Loads one or more sensors from the auxiliary data folder. Sensors are
    automatically detected and loaded based on their type.

    Parameters
    ----------
    sensor_name : Union[str, List[str]]
        Single sensor name or list of sensor names.
        Supported sensors: 'gps', 'imu', 'adc', 'inclinometer'

    Examples
    --------
    >>> # Load a single sensor
    >>> flight.add_sensor_data('gps')
    >>> print(flight.raw_data.payload_data.gps)
    >>> # Load multiple sensors at once
    >>> flight.add_sensor_data(['gps', 'imu', 'adc'])
    >>> # Access sensor data
    >>> gps_data = flight.raw_data.payload_data.gps
    >>> imu_data = flight.raw_data.payload_data.imu
    >>> # Or use dictionary-style
    >>> gps_data = flight['raw_data']['payload']['gps']
    >>> # Filter GPS data
    >>> high_accuracy = gps_data.filter(pl.col('accuracy') < 5.0)
    >>> # List all loaded sensors
    >>> print(flight.raw_data.payload_data.list_loaded_sensors())
    """
    sensor_path = Path(self.flight_info["aux_data_folder_path"]) / "sensors"

    if sensor_path.exists():
        if isinstance(sensor_name, str):
            sensor_name = [sensor_name]

        for sensor in sensor_name:
            sensor_data = self._read_sensor_data(sensor, sensor_path)
            if isinstance(sensor_data, dict) and (sensor != "inclinometer"):
                setattr(self.raw_data.payload_data, sensor, sensor_data["data"])
                self.flight_info["flight_info"].update(
                    {f"{sensor}_metadata": sensor_data["metadata"]}
                )

            else:
                setattr(self.raw_data.payload_data, sensor, sensor_data)
    else:
        logger.info("Sensor datasets are not available")

add_camera_data

add_camera_data(use_photogrammetry: bool = False, get_sony_angles: bool = True) -> None

Load camera data from the payload.

Supports both video cameras (Sony RX0 MarkII with telemetry, Alvium industrial) and photogrammetry-processed data. For video cameras, can compute Euler angles (roll, pitch, yaw) and quaternions from inertial measurement data.

Parameters:

Name Type Description Default
use_photogrammetry bool

If True, loads pre-processed photogrammetry results from proc_data folder. If False, loads camera data from aux_data/camera folder (video or logs).

False
get_sony_angles bool

For Sony cameras, whether to compute Euler angles and quaternions from telemetry gyro/accel data using AHRS (Madgwick) filter.

True

Raises:

Type Description
FileNotFoundError

If camera data folder or photogrammetry folder not found

Examples:

>>> # Load Sony RX0 MarkII video data with angles computed
>>> flight.add_camera_data(use_photogrammetry=False, get_sony_angles=True)
>>> # Load pre-processed photogrammetry results
>>> flight.add_camera_data(use_photogrammetry=True)
>>> # Load camera video data without computing angles
>>> flight.add_camera_data(use_photogrammetry=False, get_sony_angles=False)
Source code in pils/flight.py
def add_camera_data(
    self, use_photogrammetry: bool = False, get_sony_angles: bool = True
) -> None:
    """
    Load camera data from the payload.

    Supports both video cameras (Sony RX0 MarkII with telemetry, Alvium industrial)
    and photogrammetry-processed data. For video cameras, can compute Euler angles
    (roll, pitch, yaw) and quaternions from inertial measurement data.

    Parameters
    ----------
    use_photogrammetry : bool, default=False
        If True, loads pre-processed photogrammetry results from proc_data folder.
        If False, loads camera data from aux_data/camera folder (video or logs).
    get_sony_angles : bool, default=True
        For Sony cameras, whether to compute Euler angles and quaternions from
        telemetry gyro/accel data using AHRS (Madgwick) filter.

    Raises
    ------
    FileNotFoundError
        If camera data folder or photogrammetry folder not found

    Examples
    --------
    >>> # Load Sony RX0 MarkII video data with angles computed
    >>> flight.add_camera_data(use_photogrammetry=False, get_sony_angles=True)
    >>> # Load pre-processed photogrammetry results
    >>> flight.add_camera_data(use_photogrammetry=True)
    >>> # Load camera video data without computing angles
    >>> flight.add_camera_data(use_photogrammetry=False, get_sony_angles=False)
    """

    self.__use_photogrammetry = use_photogrammetry
    if use_photogrammetry:
        self.__camera_data_type = "photogrammetry"
        path = Path(self.flight_info["proc_data_folder_path"]) / "photogrammetry"
    else:
        self.__camera_data_type = "camera"
        path = Path(self.flight_info["aux_data_folder_path"]) / "camera"

    if path.exists():
        camera = Camera(path, use_photogrammetry=use_photogrammetry)

        camera.load_data()

        self.raw_data.payload_data.camera = camera.data[0]

        self.__camera_model = camera.data[1]

    else:
        logger.info("Camera Path does not exist")

sync

sync(target_rate: dict[str, float] | None = None, use_rtk_data: bool = True, common_time: bool = True, **kwargs) -> dict[str, DataFrame]

Synchronize flight data using GPS-based correlation.

Creates a Synchronizer instance, adds available data sources, performs synchronization, and stores the result in sync_data attribute.

Parameters:

Name Type Description Default
target_rate dict

Target sample rate in Hz of the different sensors; if None the following rates are applied: - 10 Hz for drone and litchi - 100 Hz for payload sensors (including inclinometer and ADC)

None
use_rtk_data bool

For DJI drones: if True, use RTK data; if False, use standard GPS

True
common_time bool

Interpolate all the data at a common time, with a sampliing frequency determined by the target_rate. If False, the time is just shifted and the other columns are not touched

True
**kwargs dict

Additional arguments passed to Synchronizer.synchronize()

{}

Returns:

Type Description
DataFrame

Synchronized data

Raises:

Type Description
ValueError

If no GPS payload data available (required as reference)

Examples:

>>> # Basic synchronization with RTK data
>>> flight.add_sensor_data(['gps', 'imu', 'adc'])
>>> flight.add_drone_data()
>>> sync_df = flight.sync(target_rate={'drone': 10.0, 'payload': 100.0}, use_rtk_data=True)
>>> # Use standard GPS instead of RTK
>>> sync_df = flight.sync(target_rate={'drone': 10.0}, use_rtk_data=False)
>>> # Synchronization is stored in flight.sync_data as a dict of DataFrames
>>> print(list(flight.sync_data.keys()))
Source code in pils/flight.py
def sync(
    self,
    target_rate: dict[str, float] | None = None,
    use_rtk_data: bool = True,
    common_time: bool = True,
    **kwargs,
) -> dict[str, pl.DataFrame]:
    """
    Synchronize flight data using GPS-based correlation.

    Creates a Synchronizer instance, adds available data sources,
    performs synchronization, and stores the result in sync_data attribute.

    Parameters
    ----------
    target_rate : dict, default=None
        Target sample rate in Hz of the different sensors; if None the following
        rates are applied:
        - 10 Hz for drone and litchi
        - 100 Hz for payload sensors (including inclinometer and ADC)
    use_rtk_data : bool, default=True
        For DJI drones: if True, use RTK data; if False, use standard GPS
    common_time: bool
        Interpolate all the data at a common time, with a sampliing frequency
        determined by the target_rate. If False, the time is just shifted and the
        other columns are not touched
    **kwargs : dict
        Additional arguments passed to Synchronizer.synchronize()

    Returns
    -------
    pl.DataFrame
        Synchronized data

    Raises
    ------
    ValueError
        If no GPS payload data available (required as reference)

    Examples
    --------
    >>> # Basic synchronization with RTK data
    >>> flight.add_sensor_data(['gps', 'imu', 'adc'])
    >>> flight.add_drone_data()
    >>> sync_df = flight.sync(target_rate={'drone': 10.0, 'payload': 100.0}, use_rtk_data=True)
    >>> # Use standard GPS instead of RTK
    >>> sync_df = flight.sync(target_rate={'drone': 10.0}, use_rtk_data=False)
    >>> # Synchronization is stored in flight.sync_data as a dict of DataFrames
    >>> print(list(flight.sync_data.keys()))
    """

    # Check if GPS payload data is available
    if not self.raw_data.payload_data or "gps" not in self.raw_data.payload_data:
        raise ValueError(
            "GPS payload data is required as reference timebase. "
            "Call flight.add_sensor_data(['gps']) first."
        )

    # Create synchronizer
    sync = Synchronizer()

    # Add GPS payload as reference (mandatory)
    gps_sensor = self.raw_data.payload_data["gps"]
    gps_data = gps_sensor.data if hasattr(gps_sensor, "data") else gps_sensor
    sync.add_gps_reference(
        gps_data,
        timestamp_col="timestamp",
        alt_col="posllh_height",
        lat_col="posllh_lat",
        lon_col="posllh_lon",
    )

    # Add drone GPS if available
    drone_data = self.raw_data.drone_data.drone
    drone_has_data = (isinstance(drone_data, dict) and len(drone_data) > 0) or (
        isinstance(drone_data, pl.DataFrame) and len(drone_data) > 0
    )

    if target_rate is None:
        target_rate = {}

    if drone_has_data:
        # Ensure target_rate dict has drone key with default 10 Hz
        if "drone" not in target_rate:
            target_rate["drone"] = 10.0
        drone_df = drone_data

        if "dji" in self.__drone_model.lower():
            timestamp_col = "timestamp"

            if use_rtk_data:
                lat_col = "RTK:lat_p"
                lon_col = "RTK:lon_p"
                alt_col = "RTK:hmsl_p"
            else:
                lat_col = "GPS:Latitude"
                lon_col = "GPS:Longitude"
                alt_col = "GPS:heightMSL"

        else:
            timestamp_col = "timestamp"
            lat_col = "Latitude"
            lon_col = "Longitude"
            alt_col = "heightMSL"

        sync.add_drone_gps(
            drone_df,
            timestamp_col=timestamp_col,
            lat_col=lat_col,
            lon_col=lon_col,
            alt_col=alt_col,
        )

    # Add litchi GPS if available
    if len(self.raw_data.drone_data.litchi) > 0:
        litchi_df = self.raw_data.drone_data.litchi
        if (
            isinstance(litchi_df, pl.DataFrame)
            and "latitude" in litchi_df.columns
            and "longitude" in litchi_df.columns
        ):
            if "drone" not in target_rate:
                target_rate["drone"] = 10.0

            sync.add_litchi_gps(litchi_df)

    # Add inclinometer if available
    if "inclinometer" in self.raw_data.payload_data:
        if "inclinometer" not in target_rate:
            target_rate["inclinometer"] = 100.0

        incl_sensor = self.raw_data.payload_data["inclinometer"]
        incl_data = (
            incl_sensor.data if hasattr(incl_sensor, "data") else incl_sensor
        )
        if self.__inclinometer == "imx5":
            incl_data = incl_data["INS"]
        sync.add_inclinometer(incl_data, self.__inclinometer)

    # Add Camera data if available

    if "camera" in self.raw_data.payload_data:
        sync.add_camera(
            self.raw_data.payload_data["camera"],
            use_photogrammetry=self.__use_photogrammetry,
            camera_model=self.__camera_model,
        )

    # Add other payload sensors
    payload = self.raw_data.payload_data

    # Add ADC if available
    if "adc" in payload:
        if "payload" not in target_rate:
            target_rate["payload"] = 100.0
        adc_sensor = payload["adc"]
        adc_data = adc_sensor.data if hasattr(adc_sensor, "data") else adc_sensor
        sync.add_payload_sensor("adc", adc_data)

    # Add LM76 if available
    if "lm76" in payload:
        if "payload" not in target_rate:
            target_rate["payload"] = 100.0
        lm76_sensor = payload["lm76"]
        lm76_data = (
            lm76_sensor.data if hasattr(lm76_sensor, "data") else lm76_sensor
        )
        sync.add_payload_sensor("lm76", lm76_data)

    # Add IMU sensors if available
    if "imu" in payload:
        if "payload" not in target_rate:
            target_rate["payload"] = 100.0

        imu_sensor = payload["imu"]
        if hasattr(imu_sensor, "barometer") and imu_sensor.barometer is not None:
            sync.add_payload_sensor("imu_barometer", imu_sensor.barometer)
        if (
            hasattr(imu_sensor, "accelerometer")
            and imu_sensor.accelerometer is not None
        ):
            sync.add_payload_sensor("imu_accelerometer", imu_sensor.accelerometer)
        if hasattr(imu_sensor, "gyroscope") and imu_sensor.gyroscope is not None:
            sync.add_payload_sensor("imu_gyroscope", imu_sensor.gyroscope)
        if (
            hasattr(imu_sensor, "magnetometer")
            and imu_sensor.magnetometer is not None
        ):
            sync.add_payload_sensor("imu_magnetometer", imu_sensor.magnetometer)

    # Perform synchronization
    self.sync_data = sync.synchronize(
        target_rate=target_rate,
        common_time=common_time,
        **kwargs,
    )

    return self.sync_data

to_hdf5

to_hdf5(filepath: str | Path | None = None, sync_metadata: dict[str, Any] | None = None) -> str

Save flight data to HDF5 file.

Saves metadata and raw_data hierarchy.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to output HDF5 file

None
sync_metadata Dict[str, Any]

Additional metadata to store with synchronized data revision. Will be saved as attributes on the revision group. Example: {'comment': 'Initial sync', 'target_rate': 10.0}

None

Returns:

Type Description
str

Timestamp string for the save operation

Raises:

Type Description
ImportError

If h5py is not installed

ValueError

If no data to save

Examples:

>>> # Save raw data
>>> flight.to_hdf5('flight_001.h5')
>>> # Save with sync metadata
>>> flight.to_hdf5('flight_001.h5', sync_metadata={'comment': 'High rate sync', 'rate': 100.0})
>>> # For synchronization, use Synchronizer separately:
>>> from pils.synchronizer import Synchronizer
>>> sync = Synchronizer()
>>> sync.add_gps_reference(flight.raw_data.payload_data.gps)
>>> # ... add other sources ...
>>> result = sync.synchronize(target_rate={'drone': 10.0, 'payload': 100.0})
Source code in pils/flight.py
def to_hdf5(
    self,
    filepath: str | Path | None = None,
    sync_metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save flight data to HDF5 file.

    Saves metadata and raw_data hierarchy.

    Parameters
    ----------
    filepath : Union[str, Path], optional
        Path to output HDF5 file
    sync_metadata : Dict[str, Any], optional
        Additional metadata to store with synchronized data revision.
        Will be saved as attributes on the revision group.
        Example: {'comment': 'Initial sync', 'target_rate': 10.0}

    Returns
    -------
    str
        Timestamp string for the save operation

    Raises
    ------
    ImportError
        If h5py is not installed
    ValueError
        If no data to save

    Examples
    --------
    >>> # Save raw data
    >>> flight.to_hdf5('flight_001.h5')
    >>> # Save with sync metadata
    >>> flight.to_hdf5('flight_001.h5', sync_metadata={'comment': 'High rate sync', 'rate': 100.0})
    >>> # For synchronization, use Synchronizer separately:
    >>> from pils.synchronizer import Synchronizer
    >>> sync = Synchronizer()
    >>> sync.add_gps_reference(flight.raw_data.payload_data.gps)
    >>> # ... add other sources ...
    >>> result = sync.synchronize(target_rate={'drone': 10.0, 'payload': 100.0})
    """

    if filepath:
        filepath = Path(filepath)
        filepath.parent.mkdir(parents=True, exist_ok=True)

    else:
        filepath = Path(self.flight_info["proc_data_folder_path"])

    with h5py.File(str(filepath), "a") as f:
        # Save metadata
        self._save_metadata_to_hdf5(f)

        # Save raw data
        self._save_raw_data_to_hdf5(f)

        # Save sync_data if available
        if self.sync_data is not None and len(self.sync_data) > 0:
            self._save_sync_data_to_hdf5(f, sync_metadata)

    return _get_current_timestamp()

__getitem__

__getitem__(key)

Dictionary-style access to flight data.

Parameters:

Name Type Description Default
key str

Key to access ('raw_data' or 'metadata')

required

Returns:

Type Description
object

Corresponding data object

Raises:

Type Description
KeyError

If key is not found

Examples:

>>> # Access raw data
>>> raw_data = flight['raw_data']
>>> # Access metadata
>>> metadata = flight['metadata']
>>> # Chain dictionary access
>>> drone_data = flight['raw_data']['drone_data']['drone']
Source code in pils/flight.py
def __getitem__(self, key):
    """
    Dictionary-style access to flight data.

    Parameters
    ----------
    key : str
        Key to access ('raw_data' or 'metadata')

    Returns
    -------
    object
        Corresponding data object

    Raises
    ------
    KeyError
        If key is not found

    Examples
    --------
    >>> # Access raw data
    >>> raw_data = flight['raw_data']
    >>> # Access metadata
    >>> metadata = flight['metadata']
    >>> # Chain dictionary access
    >>> drone_data = flight['raw_data']['drone_data']['drone']
    """
    if key == "raw_data":
        return self.raw_data
    elif key == "metadata":
        return self.metadata
    else:
        raise KeyError(f"Key '{key}' not found")