Skip to content

DJIDrone

DJIDrone

DJIDrone

DJIDrone(path: str | Path, source_format: str | None = None)

DJI Drone data loader supporting CSV and DAT binary formats.

Handles loading and processing of DJI drone flight logs from both CSV exports and encrypted DAT binary files.

Attributes: path: Path to drone data file data: Loaded drone data (DataFrame or dict of DataFrames) sync_params: Synchronization parameters (slope, intercept) source_format: Format of source data ('csv' or 'dat') aligned_df: Aligned DataFrame after GPS synchronization

Initialize DJIDrone loader.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to DJI drone data file (CSV or DAT).

required
source_format Optional[str]

Optional format specification ('csv' or 'dat').

None
Source code in pils/drones/DJIDrone.py
def __init__(self, path: str | Path, source_format: str | None = None) -> None:
    """Initialize DJIDrone loader.

    Parameters
    ----------
    path : Union[str, Path]
        Path to DJI drone data file (CSV or DAT).
    source_format : Optional[str], optional
        Optional format specification ('csv' or 'dat').
    """
    self.path = path
    self.data: (
        dict[str, pl.DataFrame] | pl.DataFrame
    ) = {}  # Dictionary or DataFrame
    self.sync_params: tuple[float, float] | None = (
        None  # Store (slope, intercept) from Gaussian sync
    )
    self.source_format: str | None = None  # Track if data came from CSV or DAT
    self.aligned_df: pl.DataFrame | None = None  # Store aligned DataFrame

load_data

load_data(cols: list[str] | None = None, use_dat: bool = True, remove_duplicate: bool = False, correct_timestamp: bool = True, polars_interpolation: bool = True, align: bool = True) -> None

"Load and filter drone data from a CSV or DAT file.

The function: - Loads only specified columns (for CSV). - Converts 'GPS:dateTimeStamp' to datetime. - Filters out rows with missing or zero values in critical columns. - Drops any columns that are fully NaN or zero using drop_nan_and_zero_cols. - Removes consecutive duplicate position samples.

Parameters:

Name Type Description Default
cols Optional[List[str]]

List of columns to load (for CSV files). Defaults to key RTK and timestamp fields.

None
use_dat bool

If True, try to load from DAT file instead of CSV.

True
remove_duplicate bool

If True, remove consecutive duplicate position samples.

False
correct_timestamp bool

If True, correct timestamps.

True
polars_interpolation bool

If True, use polars for interpolation.

True
align bool

If True, align DAT file data with GPS.

True
Source code in pils/drones/DJIDrone.py
def load_data(
    self,
    cols: list[str] | None = None,
    use_dat: bool = True,
    remove_duplicate: bool = False,
    correct_timestamp: bool = True,
    polars_interpolation: bool = True,
    align: bool = True,
) -> None:
    """ "Load and filter drone data from a CSV or DAT file.

    The function:
    - Loads only specified columns (for CSV).
    - Converts 'GPS:dateTimeStamp' to datetime.
    - Filters out rows with missing or zero values in critical columns.
    - Drops any columns that are fully NaN or zero using `drop_nan_and_zero_cols`.
    - Removes consecutive duplicate position samples.

    Parameters
    ----------
    cols : Optional[List[str]], optional
        List of columns to load (for CSV files). Defaults to key RTK and timestamp fields.
    use_dat : bool, optional
        If True, try to load from DAT file instead of CSV.
    remove_duplicate : bool, optional
        If True, remove consecutive duplicate position samples.
    correct_timestamp : bool, optional
        If True, correct timestamps.
    polars_interpolation : bool, optional
        If True, use polars for interpolation.
    align : bool, optional
        If True, align DAT file data with GPS.
    """
    # Auto-detect file format if not specified
    # if use_dat is None:
    #     file_extension = Path(self.path).suffix.lower()
    #     use_dat = file_extension in [".dat", ".bin"]

    if use_dat:
        self._load_from_dat()
        self.source_format = "dat"
    else:
        self._load_from_csv(cols)
        self.source_format = "csv"

    # Remove consecutive duplicate position samples
    if remove_duplicate:
        self._remove_consecutive_duplicates()

    if correct_timestamp:
        logger.info("Converting timestamps to milliseconds")
        if self.source_format == "csv":
            # For CSV format, self.data is a DataFrame
            assert isinstance(self.data, pl.DataFrame), (
                "Expected DataFrame for CSV format"
            )
            # Calculate mean offset from actual data, not expressions
            timestamp_vals = self.data.get_column("timestamp_old").to_numpy()
            tags = np.where(np.diff(timestamp_vals) > 0.5)[0] + 1

            offset_vals = self.data.get_column("Clock:offsetTime").to_numpy()

            offset_vals = offset_vals[tags].astype(np.float64)
            timestamp_vals = timestamp_vals[tags].astype(np.float64)
            mean_offset = float(np.mean(timestamp_vals - offset_vals))

            self.data = self.data.with_columns(
                ((pl.col("Clock:offsetTime") + mean_offset).cast(pl.Float64)).alias(
                    "timestamp"
                )
            )
        elif self.source_format == "dat":
            # For DAT format, align_datfile returns a DataFrame
            aligned = self.align_datfile(polars_interpolation=polars_interpolation)
            if aligned is not None:
                self.data = aligned

    else:
        if self.source_format == "DAT" and align:
            aligned = self.align_datfile(correct_timestamp=False)

            if aligned is not None:
                self.data = aligned

align_datfile

align_datfile(correct_timestamp: bool = True, sampling_freq: float = 5.0, polars_interpolation: bool = True) -> DataFrame | None

Align DAT file data using GPS synchronization.

Parameters:

Name Type Description Default
correct_timestamp bool

If True, correct timestamps using GPS synchronization.

True
sampling_freq float

Target sampling frequency in Hz.

5.0
polars_interpolation bool

If True, use Polars for interpolation.

True

Returns:

Type Description
Optional[DataFrame]

Aligned DataFrame, or None if alignment fails.

Source code in pils/drones/DJIDrone.py
def align_datfile(
    self,
    correct_timestamp: bool = True,
    sampling_freq: float = 5.0,
    polars_interpolation: bool = True,
) -> pl.DataFrame | None:
    """Align DAT file data using GPS synchronization.

    Parameters
    ----------
    correct_timestamp : bool, optional
        If True, correct timestamps using GPS synchronization.
    sampling_freq : float, optional
        Target sampling frequency in Hz.
    polars_interpolation : bool, optional
        If True, use Polars for interpolation.

    Returns
    -------
    Optional[pl.DataFrame]
        Aligned DataFrame, or None if alignment fails.
    """
    # Ensure self.data is a dict for DAT format
    assert isinstance(self.data, dict), "Expected dict for DAT format"

    if correct_timestamp:
        _ = self.get_tick_offset()

        if polars_interpolation:
            tmp = pl.DataFrame(
                {
                    "tick": pl.Series([], dtype=pl.Int64),
                    "msg_type": pl.Series([], dtype=pl.Int64),
                }
            )

            for _i, key in enumerate(self.data):
                tmp = tmp.join(
                    self.data[key],
                    on=["tick", "msg_type"],
                    how="full",
                    coalesce=True,
                ).sort("tick")

            numeric_cols = [
                col
                for col in tmp.columns
                if tmp[col].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64]
            ]
            exclude_cols = {"tick", "msg_type"}

            for col in numeric_cols:
                if col not in exclude_cols:
                    tmp = tmp.with_columns(
                        pl.col(col).interpolate_by("tick").alias(col)
                    )

            aligned_df = tmp

        else:
            gps_df: pl.DataFrame = self.data["GPS"]
            rtk_df: pl.DataFrame = self.data["RTK"]

            # Determine the start and end ticks based on the overlap of the two datasets
            gps_min = gps_df.get_column("tick").min()
            gps_max = gps_df.get_column("tick").max()
            rtk_min = rtk_df.get_column("tick").min()
            rtk_max = rtk_df.get_column("tick").max()

            if (
                gps_min is None
                or gps_max is None
                or rtk_min is None
                or rtk_max is None
            ):
                logger.warning("Could not determine tick range for alignment.")
                return None

            # Cast to float to ensure numeric comparison
            gps_min_f, gps_max_f = float(gps_min), float(gps_max)  # type: ignore
            rtk_min_f, rtk_max_f = float(rtk_min), float(rtk_max)  # type: ignore

            start_tick = max(gps_min_f, rtk_min_f)
            end_tick = min(gps_max_f, rtk_max_f)

            # Logic limits for the ticks to ensure valid range
            if start_tick >= end_tick:
                logger.warning("No overlapping data found between GPS and RTK.")
                return None

            # Create the aligned tick grid based on sampling frequency
            tick_freq = 4_500_000.0
            tick_step = tick_freq / sampling_freq
            target_ticks = np.arange(start_tick, end_tick, tick_step)

            logger.info(
                f"Target ticks: {len(target_ticks)}, {start_tick:.2f} to {end_tick:.2f}"
            )

            aligned_data: dict[str, np.ndarray] = {"corrected_tick": target_ticks}

            def interpolate_columns(df: pl.DataFrame, exclude_cols: set):
                # Ensure unique and sorted by corrected_tick for reliable interpolation
                x = df.get_column("tick").to_numpy()

                for col in df.columns:
                    if col in exclude_cols:
                        continue

                    # Skip if column is not numeric
                    if df[col].dtype not in [
                        pl.Float32,
                        pl.Float64,
                        pl.Int32,
                        pl.Int64,
                        pl.UInt32,
                        pl.UInt64,
                    ]:
                        continue

                    y = df.get_column(col).to_numpy()
                    try:
                        f = interp1d(
                            x,
                            y,
                            kind="linear",
                            bounds_error=False,
                            fill_value=np.nan,
                        )
                        aligned_data[col] = f(target_ticks)
                    except Exception as e:
                        logger.warning(f"Failed to interpolate column {col}: {e}")

            # Columns to exclude from generic interpolation
            common_exclude = {
                "tick",
                "date",
                "time",
                "datetime",
                "timestamp",
            }
            gps_exclude = common_exclude.union({"GPS:date", "GPS:time"})
            rtk_exclude = common_exclude.union({"RTK:date", "RTK:time"})

            interpolate_columns(gps_df, gps_exclude)
            interpolate_columns(rtk_df, rtk_exclude)

            aligned_df = pl.DataFrame(aligned_data)

            aligned_df = aligned_df.with_columns(
                (pl.col("timestamp") * 1000)
                .cast(pl.Int64)
                .cast(pl.Datetime("ms"))
                .alias("datetime_converted")
            )

            min_val = aligned_data["timestamp"].min()
            max_val = aligned_data["timestamp"].max()

            logger.info(f"Timestamp corrected {min_val}, {max_val}")

            # Ensure timestamp is present and maybe sort columns
            self.aligned_df = aligned_df

    else:
        logger.info("Alignment with no timestamp correction applied")

        base_tick = self.data["GPS"].get_column("tick")[0]

        tmp = pl.DataFrame(
            {
                "tick": pl.Series([], dtype=pl.Int64),
                "msg_type": pl.Series([], dtype=pl.Int64),
            }
        )

        for _, key in enumerate(self.data):
            tmp = tmp.join(
                self.data[key], on=["tick", "msg_type"], how="full", coalesce=True
            ).sort("tick")

        numeric_cols = [
            col
            for col in tmp.columns
            if tmp[col].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64]
        ]
        exclude_cols = {"tick", "msg_type"}

        for col in numeric_cols:
            if col not in exclude_cols:
                tmp = tmp.with_columns(
                    pl.col(col).interpolate_by("tick").alias(col)
                )

        aligned_df = tmp

        aligned_df = aligned_df.with_columns(
            ((pl.col("tick") - base_tick) / 4_500_000.0).alias("offset")
        )

        timestamp_vals = aligned_df.get_column("timestamp_old").to_numpy()
        tags = np.where(np.diff(timestamp_vals) > 0.5)[0] + 1

        offset_vals = aligned_df.get_column("offset").to_numpy()

        offset_vals = offset_vals[tags].astype(np.float64)
        timestamp_vals = timestamp_vals[tags].astype(np.float64)
        mean_offset = float(np.mean(timestamp_vals - offset_vals))

        aligned_df = aligned_df.with_columns(
            ((pl.col("offset") + mean_offset).cast(pl.Float64)).alias("timestamp")
        )

    return aligned_df