Skip to content

Synchronizer

Synchronizer

Synchronizer()

GPS-based correlation synchronizer with hierarchical reference.

This synchronizer uses cross-correlation to detect TIME offsets between data sources, with GPS payload as the mandatory reference timebase.

Correlation Methods: - GPS sources: NED position correlation (3D signal) - Inclinometer: Pitch angle correlation (1D signal) - Camera: Timestamp alignment with optional photogrammetry support

All synchronization outputs are TIME OFFSETS (seconds) to align data to GPS payload timebase.

Attributes:

Name Type Description
gps_payload Optional[DataFrame]

GPS payload data (reference timebase)

drone_gps Optional[DataFrame]

Optional drone GPS data

litchi_gps Optional[DataFrame]

Optional litchi GPS data

inclinometer Optional[DataFrame]

Optional inclinometer data

camera Optional[DataFrame]

Optional camera data (Sony or Alvium camera, or photogrammetry results)

other_payload Dict[str, DataFrame]

Other payload sensors (adc, imu, etc.)

offsets Dict[str, Dict[str, Any]]

Detected time offsets per source

synchronized_data Optional[DataFrame]

Final synchronized DataFrame

Examples:

>>> import polars as pl
>>> import numpy as np
>>> from pils.synchronizer import Synchronizer
>>> # Create sample GPS payload data (reference)
>>> t = np.linspace(0, 100, 1000)
>>> gps_payload = pl.DataFrame({
...     'timestamp': t,
...     'latitude': 45.0 + 0.001 * np.sin(0.1 * t),
...     'longitude': 10.0 + 0.001 * np.cos(0.1 * t),
...     'altitude': 100.0 + 10.0 * np.sin(0.05 * t),
... })
>>> # Create drone GPS data with 2-second time offset
>>> drone_gps = pl.DataFrame({
...     'timestamp': t + 2.0,  # Drone data 2s ahead
...     'latitude': 45.0 + 0.001 * np.sin(0.1 * (t + 2.0)),
...     'longitude': 10.0 + 0.001 * np.cos(0.1 * (t + 2.0)),
...     'altitude': 100.0 + 10.0 * np.sin(0.05 * (t + 2.0)),
... })
>>> # Initialize synchronizer
>>> sync = Synchronizer()
>>> # Add GPS payload as reference (mandatory)
>>> sync.add_gps_reference(gps_payload)
>>> # Add drone GPS for correlation
>>> sync.add_drone_gps(drone_gps)
>>> # Execute synchronization
>>> result = sync.synchronize(target_rate={'drone': 10.0}, common_time=True)
>>> # Check detected offsets
>>> print(sync.get_offset_summary())
Correlation Synchronizer - Detected Time Offsets
============================================================

DRONE_GPS Time Offset: 2.000 s Correlation: 0.998 Spatial Offset: 15.32 m East: 10.23 m North: 11.45 m Up: 0.12 m

>>> # Access synchronized data
>>> print(list(result.keys()))
['drone', 'reference_gps']
>>> # Verify time offset was applied
>>> assert abs(sync.offsets['drone_gps']['time_offset'] - 2.0) < 0.1

Initialize empty Synchronizer.

Source code in pils/synchronizer.py
def __init__(self):
    """Initialize empty Synchronizer."""
    self.gps_payload: pl.DataFrame | None = None
    self.drone_gps: pl.DataFrame | None = None
    self.litchi_gps: pl.DataFrame | None = None
    self.inclinometer: pl.DataFrame | None = None
    self.camera: pl.DataFrame | None = None
    self.other_payload: dict[str, pl.DataFrame] = {}

    self.offsets: dict[str, dict[str, Any]] = {}
    self.synchronized_data: dict[str, Any] | None = None

__clean_data staticmethod

__clean_data(time: ndarray, east: ndarray, north: ndarray, up: ndarray) -> tuple[ndarray, ndarray, ndarray, ndarray]

Remove outliers and NaN values from GPS position data using velocity thresholds.

Identifies erroneous GPS measurements by detecting velocity spikes that exceed physical thresholds. Removes both the outlier sample and the subsequent sample to eliminate corrupted segments.

Parameters:

Name Type Description Default
time ndarray

Timestamp array in seconds

required
east ndarray

East position component in meters

required
north ndarray

North position component in meters

required
up ndarray

Up (altitude) position component in meters

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray, ndarray]

Cleaned (time, east, north, up) arrays with outliers and NaN removed

Notes
  • Horizontal velocity threshold: 50 m/s
  • Vertical velocity threshold: 20 m/s
  • Removes sample immediately after outlier to avoid interpolation artifacts
  • Also removes any samples with NaN values in position

Examples:

>>> time = np.array([0, 1, 2, 3, 4])
>>> east = np.array([0, 1, 50, 3, 4])  # Jump at idx=2
>>> north = np.array([0, 1, 2, 3, 4])
>>> up = np.array([0, 1, 2, 3, 4])
>>> t_clean, e_clean, n_clean, u_clean = Synchronizer.__clean_data(
...     time, east, north, up
... )
>>> # Outliers at indices 2,3 (and next sample) are removed
Source code in pils/synchronizer.py
@staticmethod
def __clean_data(
    time: np.ndarray, east: np.ndarray, north: np.ndarray, up: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Remove outliers and NaN values from GPS position data using velocity thresholds.

    Identifies erroneous GPS measurements by detecting velocity spikes that exceed
    physical thresholds. Removes both the outlier sample and the subsequent sample
    to eliminate corrupted segments.

    Parameters
    ----------
    time : np.ndarray
        Timestamp array in seconds
    east : np.ndarray
        East position component in meters
    north : np.ndarray
        North position component in meters
    up : np.ndarray
        Up (altitude) position component in meters

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
        Cleaned (time, east, north, up) arrays with outliers and NaN removed

    Notes
    -----
    - Horizontal velocity threshold: 50 m/s
    - Vertical velocity threshold: 20 m/s
    - Removes sample immediately after outlier to avoid interpolation artifacts
    - Also removes any samples with NaN values in position

    Examples
    --------
    >>> time = np.array([0, 1, 2, 3, 4])
    >>> east = np.array([0, 1, 50, 3, 4])  # Jump at idx=2
    >>> north = np.array([0, 1, 2, 3, 4])
    >>> up = np.array([0, 1, 2, 3, 4])
    >>> t_clean, e_clean, n_clean, u_clean = Synchronizer.__clean_data(
    ...     time, east, north, up
    ... )
    >>> # Outliers at indices 2,3 (and next sample) are removed
    """
    dt = np.diff(time)
    de = np.diff(east)
    dn = np.diff(north)
    du = np.diff(up)

    velocity_horizontal = np.sqrt(de**2 + dn**2) / dt
    velocity_vertical = np.abs(du) / dt

    # Detect outliers based on velocity thresholds
    threshold_horizontal = 50.0  # m/s
    threshold_vertical = 20.0  # m/s

    outliers_h = np.where(velocity_horizontal > threshold_horizontal)[0]
    outliers_v = np.where(velocity_vertical > threshold_vertical)[0]
    outliers = np.unique(np.concatenate([outliers_h, outliers_v])).astype("int")

    good_mask = np.ones(len(time), dtype=bool)
    good_mask[outliers] = False
    good_mask[outliers + 1] = False  # Also remove the next sample

    # Also remove NaN values
    nan_mask = ~(np.isnan(east) | np.isnan(north) | np.isnan(up) | np.isnan(time))
    good_mask = good_mask & nan_mask

    return time[good_mask], east[good_mask], north[good_mask], up[good_mask]

add_gps_reference

add_gps_reference(gps_data: DataFrame, timestamp_col: str = 'timestamp', lat_col: str = 'posllh_lat', lon_col: str = 'posllh_lon', alt_col: str = 'posllh_height') -> None

Set GPS payload as reference timebase (mandatory).

Parameters:

Name Type Description Default
gps_data DataFrame

Polars DataFrame with GPS data

required
timestamp_col str

Name of timestamp column

'timestamp'
lat_col str

Name of latitude column

'posllh_lat'
lon_col str

Name of longitude column

'posllh_lon'
alt_col str

Name of altitude column

'posllh_height'

Raises:

Type Description
ValueError

If required columns are missing or data is empty

Source code in pils/synchronizer.py
def add_gps_reference(
    self,
    gps_data: pl.DataFrame,
    timestamp_col: str = "timestamp",
    lat_col: str = "posllh_lat",
    lon_col: str = "posllh_lon",
    alt_col: str = "posllh_height",
) -> None:
    """
    Set GPS payload as reference timebase (mandatory).

    Parameters
    ----------
    gps_data : pl.DataFrame
        Polars DataFrame with GPS data
    timestamp_col : str, default='timestamp'
        Name of timestamp column
    lat_col : str, default='posllh_lat'
        Name of latitude column
    lon_col : str, default='posllh_lon'
        Name of longitude column
    alt_col : str, default='posllh_height'
        Name of altitude column

    Raises
    ------
    ValueError
        If required columns are missing or data is empty
    """
    required_cols = [timestamp_col, lat_col, lon_col, alt_col]
    missing_cols = [col for col in required_cols if col not in gps_data.columns]

    if missing_cols:
        raise ValueError(f"GPS payload missing columns: {missing_cols}")

    if len(gps_data) == 0:
        raise ValueError("GPS payload data is empty")

    self.__ref_height = gps_data[alt_col][0]
    self.__ref_names = {
        "timestamp": timestamp_col,
        "lat_col": lat_col,
        "lon_col": lon_col,
        "alt_col": alt_col,
    }

    self.gps_payload = gps_data
    logger.info(f"Set GPS payload reference with {len(gps_data)} samples")

add_drone_gps

add_drone_gps(gps_data: DataFrame | dict[str, DataFrame], timestamp_col: str = 'timestamp', lat_col: str = 'latitude', lon_col: str = 'longitude', alt_col: str = 'altitude') -> None

Add drone GPS data for correlation.

Parameters:

Name Type Description Default
gps_data DataFrame | Dict[str, DataFrame]

Polars DataFrame with GPS data, or dict containing GPS data timestamp_col : str, default='timestamp' Name of timestamp column

required
lat_col str

Name of latitude column

'latitude'
lon_col str

Name of longitude column

'longitude'
alt_col str

Name of altitude column

'altitude'

Raises:

Type Description
ValueError

If required columns are missing or data is empty

Source code in pils/synchronizer.py
def add_drone_gps(
    self,
    gps_data: pl.DataFrame | dict[str, pl.DataFrame],
    timestamp_col: str = "timestamp",
    lat_col: str = "latitude",
    lon_col: str = "longitude",
    alt_col: str = "altitude",
) -> None:
    """
    Add drone GPS data for correlation.

    Parameters
    ----------
    gps_data : pl.DataFrame | Dict[str, pl.DataFrame]
        Polars DataFrame with GPS data, or dict containing GPS data
            timestamp_col : str, default='timestamp'
        Name of timestamp column
    lat_col : str, default='latitude'
        Name of latitude column
    lon_col : str, default='longitude'
        Name of longitude column
    alt_col : str, default='altitude'
        Name of altitude column

    Raises
    ------
    ValueError
        If required columns are missing or data is empty
    """
    # Extract DataFrame from dict if needed
    if isinstance(gps_data, dict):
        if "gps" in gps_data:
            df = gps_data["gps"]
        elif "GPS" in gps_data:
            df = gps_data["GPS"]
        else:
            df = next(iter(gps_data.values()))
    else:
        df = gps_data

    required_cols = [timestamp_col, lat_col, lon_col, alt_col]
    missing_cols = [col for col in required_cols if col not in df.columns]

    if missing_cols:
        raise ValueError(f"Drone GPS missing columns: {missing_cols}")

    if len(df) == 0:
        raise ValueError("Drone GPS data is empty")

    self.drone_gps = df

    self.__drone_names = {
        "timestamp": timestamp_col,
        "lat_col": lat_col,
        "lon_col": lon_col,
        "alt_col": alt_col,
    }

    logger.info(f"Added drone GPS with {len(gps_data)} samples")

add_litchi_gps

add_litchi_gps(gps_data: DataFrame, timestamp_col: str = 'timestamp', lat_col: str = 'latitude', lon_col: str = 'longitude', alt_col: str = 'altitude(m)') -> None

Add Litchi GPS data for correlation.

Parameters:

Name Type Description Default
gps_data DataFrame

Polars DataFrame with GPS data

required
timestamp_col str

Name of timestamp column

'timestamp'
lat_col str

Name of latitude column

'latitude'
lon_col str

Name of longitude column

'longitude'
alt_col str

Name of altitude column

'altitude'

Raises:

Type Description
ValueError

If required columns are missing or data is empty

Source code in pils/synchronizer.py
def add_litchi_gps(
    self,
    gps_data: pl.DataFrame,
    timestamp_col: str = "timestamp",
    lat_col: str = "latitude",
    lon_col: str = "longitude",
    alt_col: str = "altitude(m)",
) -> None:
    """
    Add Litchi GPS data for correlation.

    Parameters
    ----------
    gps_data : pl.DataFrame
        Polars DataFrame with GPS data
    timestamp_col : str, default='timestamp'
        Name of timestamp column
    lat_col : str, default='latitude'
        Name of latitude column
    lon_col : str, default='longitude'
        Name of longitude column
    alt_col : str, default='altitude'
        Name of altitude column

    Raises
    ------
    ValueError
        If required columns are missing or data is empty
    """
    required_cols = [timestamp_col, lat_col, lon_col, alt_col]
    missing_cols = [col for col in required_cols if col not in gps_data.columns]

    if missing_cols:
        raise ValueError(f"Litchi GPS missing columns: {missing_cols}")

    if len(gps_data) == 0:
        raise ValueError("Litchi GPS data is empty")

    self.litchi_gps = gps_data
    self.litchi_gps = self.litchi_gps.with_columns(
        pl.col(alt_col) + self.__ref_height
    )

    self.__litchi_names = {
        "timestamp": timestamp_col,
        "lat_col": lat_col,
        "lon_col": lon_col,
        "alt_col": alt_col,
        "pitch": "gimbalPitch",
    }

    logger.info(f"Added litchi GPS with {len(gps_data)} samples")

add_inclinometer

add_inclinometer(inclinometer_data: DataFrame, inclinometer_type: str, timestamp_col: str = 'timestamp', pitch_col: str = 'pitch') -> None

Add inclinometer data for pitch-based correlation.

Parameters:

Name Type Description Default
inclinometer_data DataFrame

Polars DataFrame with inclinometer data

required
inclinometer_type str

Type of inclinometer sensor (e.g., 'imx5')

required
timestamp_col str

Name of timestamp column

'timestamp'
pitch_col str

Name of pitch column

'pitch'

Raises:

Type Description
ValueError

If required columns are missing or data is empty

Source code in pils/synchronizer.py
def add_inclinometer(
    self,
    inclinometer_data: pl.DataFrame,
    inclinometer_type: str,
    timestamp_col: str = "timestamp",
    pitch_col: str = "pitch",
) -> None:
    """
    Add inclinometer data for pitch-based correlation.

    Parameters
    ----------
    inclinometer_data : pl.DataFrame
        Polars DataFrame with inclinometer data
    inclinometer_type : str
        Type of inclinometer sensor (e.g., 'imx5')
    timestamp_col : str, default='timestamp'
        Name of timestamp column
    pitch_col : str, default='pitch'
        Name of pitch column

    Raises
    ------
    ValueError
        If required columns are missing or data is empty
    """
    required_cols = [timestamp_col, pitch_col]
    missing_cols = [
        col for col in required_cols if col not in inclinometer_data.columns
    ]

    if missing_cols:
        raise ValueError(f"Inclinometer missing columns: {missing_cols}")

    if len(inclinometer_data) == 0:
        raise ValueError("Inclinometer data is empty")

    self.inclinometer = inclinometer_data

    if inclinometer_type == "imx5":
        self.__inclinometer_names = {"timestamp": "timestamp", "pitch": "pitch"}

    logger.info(f"Added inclinometer with {len(inclinometer_data)} samples")

add_payload_sensor

add_payload_sensor(sensor_name: str, sensor_data: DataFrame) -> None

Add other payload sensor data (no correlation, simple alignment).

Parameters:

Name Type Description Default
sensor_name str

Name of sensor (e.g., 'adc', 'imu')

required
sensor_data DataFrame

Polars DataFrame with sensor data

required

Raises:

Type Description
ValueError

If sensor data is empty

Source code in pils/synchronizer.py
def add_payload_sensor(
    self,
    sensor_name: str,
    sensor_data: pl.DataFrame,
) -> None:
    """
    Add other payload sensor data (no correlation, simple alignment).

    Parameters
    ----------
    sensor_name : str
        Name of sensor (e.g., 'adc', 'imu')
    sensor_data : pl.DataFrame
        Polars DataFrame with sensor data

    Raises
    ------
    ValueError
        If sensor data is empty
    """
    if len(sensor_data) == 0:
        raise ValueError(f"Sensor {sensor_name} data is empty")

    self.other_payload[sensor_name] = sensor_data
    logger.info(
        f"Added payload sensor '{sensor_name}' with {len(sensor_data)} samples"
    )

synchronize

synchronize(target_rate: dict[str, float], common_time: bool = True) -> dict[str, Any]

Execute correlation-based synchronization.

Detects time offsets for all sources using correlation, then interpolates to GPS payload timebase at target rates.

Parameters:

Name Type Description Default
target_rate dict

Target sample rates in Hz for each source Keys: "drone", "litchi", "inclinometer" and "payload" Values: float sample rate in Hz

required
common_time bool

Interpolate all the data at a common time, with a sampliing frequency determined by the target_rate. If False, the time is just shifted and the other columns are not touched

True

Returns:

Type Description
dict

Synchronized data dictionary with interpolated values for each source. Keys: "drone", "litchi", "reference_gps", "inclinometer", "camera", "payload"

GPS sources (drone, litchi, reference_gps) use standardized coordinate columns: - latitude: Latitude in degrees (WGS84) - longitude: Longitude in degrees (WGS84) - altitude: Altitude in meters (ellipsoidal height) - timestamp: Time in seconds

Non-coordinate columns preserve their original names.

Raises:

Type Description
RuntimeError

If GPS payload reference not set

Source code in pils/synchronizer.py
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
def synchronize(
    self,
    target_rate: dict[str, float],
    common_time: bool = True,
) -> dict[str, Any]:
    """
    Execute correlation-based synchronization.

    Detects time offsets for all sources using correlation, then
    interpolates to GPS payload timebase at target rates.

    Parameters
    ----------
    target_rate : dict
        Target sample rates in Hz for each source
        Keys: "drone", "litchi", "inclinometer" and "payload"
        Values: float sample rate in Hz
    common_time: bool
        Interpolate all the data at a common time, with a sampliing frequency
        determined by the target_rate. If False, the time is just shifted and the
        other columns are not touched

    Returns
    -------
    dict
        Synchronized data dictionary with interpolated values for each source.
        Keys: "drone", "litchi", "reference_gps", "inclinometer", "camera", "payload"

        GPS sources (drone, litchi, reference_gps) use standardized coordinate columns:
        - latitude: Latitude in degrees (WGS84)
        - longitude: Longitude in degrees (WGS84)
        - altitude: Altitude in meters (ellipsoidal height)
        - timestamp: Time in seconds

        Non-coordinate columns preserve their original names.

    Raises
    ------
    RuntimeError
        If GPS payload reference not set
    """
    if self.gps_payload is None:
        raise RuntimeError(
            "GPS payload reference not set. Call add_gps_reference() first."
        )

    # Get GPS payload timebase
    gps_time = self.gps_payload["timestamp"].to_numpy()
    t_start, t_end = float(gps_time[0]), float(gps_time[-1])

    # Detect offsets for each source
    self.offsets = {}

    # Drone GPS offset detection
    if self.drone_gps is not None:
        logger.info("Detecting drone GPS offset via NED correlation...")
        result = self._find_gps_offset(
            time1=self.gps_payload[self.__ref_names["timestamp"]].to_numpy(),
            lat1=self.gps_payload[self.__ref_names["lat_col"]].to_numpy(),
            lon1=self.gps_payload[self.__ref_names["lon_col"]].to_numpy(),
            alt1=self.gps_payload[self.__ref_names["alt_col"]].to_numpy(),
            time2=self.drone_gps[self.__drone_names["timestamp"]].to_numpy(),
            lat2=self.drone_gps[self.__drone_names["lat_col"]].to_numpy(),
            lon2=self.drone_gps[self.__drone_names["lon_col"]].to_numpy(),
            alt2=self.drone_gps[self.__drone_names["alt_col"]].to_numpy(),
        )
        if result:
            self.offsets["drone_gps"] = result
            logger.info(
                f"Drone GPS offset: {result['time_offset']:.3f}s (corr={result['correlation']:.3f})"
            )
        else:
            logger.warning("Failed to detect drone GPS offset")

    # Litchi GPS offset detection
    if self.litchi_gps is not None:
        logger.info("Detecting litchi GPS offset via NED correlation...")
        result = self._find_gps_offset(
            time1=self.gps_payload[self.__ref_names["timestamp"]].to_numpy(),
            lat1=self.gps_payload[self.__ref_names["lat_col"]].to_numpy(),
            lon1=self.gps_payload[self.__ref_names["lon_col"]].to_numpy(),
            alt1=self.gps_payload[self.__ref_names["alt_col"]].to_numpy(),
            time2=self.litchi_gps[self.__litchi_names["timestamp"]].to_numpy(),
            lat2=self.litchi_gps[self.__litchi_names["lat_col"]].to_numpy(),
            lon2=self.litchi_gps[self.__litchi_names["lon_col"]].to_numpy(),
            alt2=self.litchi_gps[self.__litchi_names["alt_col"]].to_numpy(),
        )
        if result:
            self.offsets["litchi_gps"] = result
            logger.info(
                f"Litchi GPS offset: {result['time_offset']:.3f}s (corr={result['correlation']:.3f})"
            )
        else:
            logger.warning("Failed to detect litchi GPS offset")

    # Inclinometer offset detection (using litchi gimbal pitch if available)
    if self.inclinometer is not None and self.litchi_gps is not None:
        # Check if litchi has pitch data
        if "gimbalPitch" in self.litchi_gps.columns:
            logger.info("Detecting inclinometer offset via pitch correlation...")
            result = self._find_pitch_offset(
                time1=self.litchi_gps[self.__litchi_names["timestamp"]].to_numpy(),
                pitch1=self.litchi_gps[self.__litchi_names["pitch"]].to_numpy(),
                time2=self.inclinometer[
                    self.__inclinometer_names["timestamp"]
                ].to_numpy(),
                pitch2=self.inclinometer[
                    self.__inclinometer_names["pitch"]
                ].to_numpy(),
            )
            if result:
                self.offsets["inclinometer"] = result
                logger.info(
                    f"Inclinometer offset (relative to Litchi): {result['time_offset']:.3f}s (corr={result['correlation']:.3f})"
                )
            else:
                logger.warning("Failed to detect inclinometer offset")

    if self.camera is not None:
        if self.__camera_model == "photogrammetry" or self.__camera_model == "sony":
            logger.info(
                f"Camera Model {self.__camera_model}, this implies use of pitch correlation"
            )

            if self.litchi_gps is not None:
                result = self._find_pitch_offset(
                    time1=self.litchi_gps[
                        self.__litchi_names["timestamp"]
                    ].to_numpy(),
                    pitch1=self.litchi_gps[self.__litchi_names["pitch"]].to_numpy(),
                    time2=self.camera[self.__camera_names["timestamp"]].to_numpy(),
                    pitch2=self.camera[self.__camera_names["pitch"]].to_numpy(),
                )
                if result:
                    self.offsets["camera"] = result
                    logger.info(
                        f"Camera offset (relative to Litchi): {result['time_offset']:.3f}s (corr={result['correlation']:.3f})"
                    )
                else:
                    logger.warning("Failed to detect camera offset")
            else:
                logger.warning(
                    "Litchi GPS data not available, skipping camera pitch correlation"
                )

        else:
            if self.__camera_model == "alvium":
                incl_offset = self.offsets.get("inclinometer", {}).get(
                    "time_offset", 0.0
                )

                self.offsets["camera"] = {"time_offset": incl_offset}

            else:
                logger.info(
                    f"Camera Model {self.__camera_model}, skipping pitch correlation"
                )
                logger.info(
                    "Using data timestamp and inclinometer offset as camera offset"
                )

    sync_data = {}

    for key in self.offsets.keys():
        if key.lower() == "drone_gps":
            sync_data["drone"] = {}

            if self.drone_gps is not None and "drone_gps" in self.offsets:
                offset = self.offsets["drone_gps"]["time_offset"]
                drone_time = (
                    self.drone_gps[self.__drone_names["timestamp"]].to_numpy()
                    + offset
                )

                if common_time:
                    n_samples = int((t_end - t_start) * target_rate["drone"]) + 1
                    target_time = np.linspace(t_start, t_end, n_samples)
                    sync_data["drone"][self.__drone_names["timestamp"]] = (
                        target_time
                    )

                    for col in self.drone_gps.columns:
                        if col == self.__drone_names["timestamp"]:
                            logger.info("Skipped Drone Timestamp")
                            continue
                        try:
                            values = self.drone_gps[col].to_numpy().astype(float)
                            interpolated = np.interp(
                                target_time,
                                drone_time,
                                values,
                                left=np.nan,
                                right=np.nan,
                            )
                            sync_data["drone"][f"{col}"] = interpolated
                        except ValueError:
                            logger.info(f"Skipped drone column: {col}")

                else:
                    for col in self.drone_gps.columns:
                        if col == self.__drone_names["timestamp"]:
                            sync_data["drone"][self.__drone_names["timestamp"]] = (
                                drone_time
                            )
                        else:
                            sync_data["drone"][col] = self.drone_gps[col]

        elif key.lower() == "litchi_gps":
            sync_data["litchi"] = {}

            if self.litchi_gps is not None and "litchi_gps" in self.offsets:
                offset = self.offsets["litchi_gps"]["time_offset"]
                litchi_time = (
                    self.litchi_gps[self.__litchi_names["timestamp"]].to_numpy()
                    + offset
                )

                if common_time:
                    n_samples = int((t_end - t_start) * target_rate["drone"]) + 1
                    target_time = np.linspace(t_start, t_end, n_samples)

                    sync_data["litchi"]["timestamp"] = target_time

                    for col in self.litchi_gps.columns:
                        if col == "timestamp":
                            continue
                        values = self.litchi_gps[col].to_numpy().astype(float)
                        interpolated = np.interp(
                            target_time,
                            litchi_time,
                            values,
                            left=np.nan,
                            right=np.nan,
                        )
                        sync_data["litchi"][f"{col}"] = interpolated

                else:
                    for col in self.litchi_gps.columns:
                        if col == self.__litchi_names["timestamp"]:
                            sync_data["litchi"][col] = litchi_time
                        else:
                            sync_data["litchi"][col] = self.litchi_gps[
                                col
                            ].to_numpy()

        elif key.lower() == "inclinometer":
            sync_data["inclinometer"] = {}

            if self.inclinometer is not None and "inclinometer" in self.offsets:
                # Inclinometer offset is relative to Litchi, not GPS payload
                # Need to add both: inclinometer-to-litchi + litchi-to-gps
                incl_offset = self.offsets["inclinometer"]["time_offset"]
                litchi_offset = self.offsets.get("litchi_gps", {}).get(
                    "time_offset", 0.0
                )
                total_offset = incl_offset + litchi_offset

                logger.info(
                    f"Applying inclinometer total offset: {total_offset:.3f}s "
                    f"(incl→litchi: {incl_offset:.3f}s + litchi→gps: {litchi_offset:.3f}s)"
                )

                if common_time:
                    n_samples = (
                        int((t_end - t_start) * target_rate["inclinometer"]) + 1
                    )
                    target_time = np.linspace(t_start, t_end, n_samples)

                    if isinstance(self.inclinometer, dict):
                        for key in self.inclinometer.keys():
                            inclinometer_time = (
                                self.inclinometer[key][
                                    self.__inclinometer_names["timestamp"]
                                ].to_numpy()
                                + total_offset
                            )

                            sync_data["inclinometer"][f"{key}_timestamp"] = (
                                target_time
                            )

                            for col in self.inclinometer[key].columns:
                                if col == self.__inclinometer_names["timestamp"]:
                                    continue
                                values = (
                                    self.inclinometer[key][col]
                                    .to_numpy()
                                    .astype(float)
                                )
                                interpolated = np.interp(
                                    target_time,
                                    inclinometer_time,
                                    values,
                                    left=np.nan,
                                    right=np.nan,
                                )
                                sync_data["inclinometer"][f"{key}_{col}"] = (
                                    interpolated
                                )

                    else:
                        inclinometer_time = (
                            self.inclinometer["timestamp"].to_numpy() + total_offset
                        )

                        sync_data["inclinometer"]["timestamp"] = target_time

                        for col in self.inclinometer.columns:
                            if col == "timestamp":
                                continue
                            values = self.inclinometer[col].to_numpy().astype(float)
                            interpolated = np.interp(
                                target_time,
                                inclinometer_time,
                                values,
                                left=np.nan,
                                right=np.nan,
                            )
                            sync_data["inclinometer"][f"{col}"] = interpolated

                else:
                    if isinstance(self.inclinometer, dict):
                        for key in self.inclinometer.keys():
                            for col in self.inclinometer[key].columns:
                                inclinometer_time = (
                                    self.inclinometer[key][
                                        self.__inclinometer_names["timestamp"]
                                    ].to_numpy()
                                    + total_offset
                                )
                                if col == self.__inclinometer_names["timestamp"]:
                                    sync_data["inclinometer"][f"{key}_{col}"] = (
                                        inclinometer_time
                                    )
                                else:
                                    sync_data["inclinometer"][f"{key}_{col}"] = (
                                        self.inclinometer[key][col].to_numpy()
                                    )
                    else:
                        inclinometer_time = (
                            self.inclinometer["timestamp"].to_numpy() + total_offset
                        )

                        for col in self.inclinometer.columns:
                            if col == self.__inclinometer_names["timestamp"]:
                                sync_data["inclinometer"][col] = inclinometer_time
                            else:
                                sync_data["inclinometer"][col] = self.inclinometer[
                                    col
                                ].to_numpy()

        elif key.lower() == "camera":
            if self.camera is None:
                logger.warning("Camera data not available, skipping camera sync")
                continue

            sync_data["camera"] = {}

            camera_offset = self.offsets["camera"]["time_offset"]
            litchi_offset = self.offsets.get("litchi_gps", {}).get(
                "time_offset", 0.0
            )
            total_offset = camera_offset + litchi_offset

            camera_time = self.camera["timestamp"].to_numpy() + total_offset

            logger.info(
                f"Applying camera total offset: {total_offset:.3f}s "
                f"(camera→litchi: {camera_offset:.3f}s + litchi→gps: {litchi_offset:.3f}s)"
            )

            if common_time:
                camera_rate = np.average(
                    1 / np.diff(self.camera[self.__camera_names["timestamp"]])
                )

                n_samples = int((t_end - t_start) * camera_rate) + 1
                target_time = np.linspace(t_start, t_end, n_samples)

                if "camera" in self.offsets:
                    # Camera offset is relative to Litchi, not GPS payload
                    # Need to add both: camera-to-litchi + litchi-to-gps

                    sync_data["camera"]["timestamp"] = target_time

                    for col in self.camera.columns:
                        if col == "timestamp":
                            continue
                        else:
                            values = self.camera[col].to_numpy().astype(float)
                            interpolated = np.interp(
                                target_time,
                                camera_time,
                                values,
                                left=np.nan,
                                right=np.nan,
                            )
                            sync_data["camera"][f"{col}"] = interpolated
            else:
                for col in self.camera.columns:
                    if col == self.__camera_names["timestamp"]:
                        sync_data["camera"][self.__camera_names["timestamp"]] = (
                            camera_time
                        )
                    else:
                        sync_data["camera"][col] = self.camera[col]

    if "camera" not in sync_data and self.camera is not None:
        logger.info("Camera offset is applied ")

    if self.other_payload:
        sync_data["payload"] = {}

        for sensor_name, sensor_df in self.other_payload.items():
            n_samples = int((t_end - t_start) * target_rate["payload"]) + 1
            target_time = np.linspace(t_start, t_end, n_samples)

            sync_data["payload"][f"{sensor_name}"] = {}

            if "timestamp" in sensor_df.columns:
                sensor_time = sensor_df["timestamp"].to_numpy().copy()

                incl_offset = self.offsets.get("inclinometer", {}).get(
                    "time_offset", 0.0
                )
                litchi_offset = self.offsets.get("litchi_gps", {}).get(
                    "time_offset", 0.0
                )
                total_offset = incl_offset + litchi_offset

                sensor_time += total_offset

                for col in sensor_df.columns:
                    if col == "timestamp":
                        if common_time:
                            sync_data["payload"][f"{sensor_name}"]["timestamp"] = (
                                target_time
                            )
                        else:
                            sync_data["payload"][f"{sensor_name}"]["timestamp"] = (
                                sensor_time
                            )
                    else:
                        if common_time:
                            values = sensor_df[col].to_numpy().astype(float)
                            interpolated = np.interp(
                                target_time,
                                sensor_time,
                                values,
                                left=np.nan,
                                right=np.nan,
                            )
                            sync_data["payload"][f"{sensor_name}"][f"{col}"] = (
                                interpolated
                            )
                        else:
                            sync_data["payload"][f"{sensor_name}"][f"{col}"] = (
                                sensor_df[col].to_numpy()
                            )

    sync_data["reference_gps"] = self.gps_payload

    self.synchronized_data = {}

    for key, value in sync_data.items():
        if key == "payload":
            for keyp, valuep in sync_data["payload"].items():
                self.synchronized_data[keyp] = pl.DataFrame(valuep)
        else:
            self.synchronized_data[key] = pl.DataFrame(value)

    # Standardize coordinate column names for GPS sources
    if "drone" in self.synchronized_data and hasattr(
        self, "_Synchronizer__drone_names"
    ):
        self.synchronized_data["drone"] = self._standardize_coordinate_columns(
            self.synchronized_data["drone"], self.__drone_names
        )

    if "litchi" in self.synchronized_data and hasattr(
        self, "_Synchronizer__litchi_names"
    ):
        self.synchronized_data["litchi"] = self._standardize_coordinate_columns(
            self.synchronized_data["litchi"], self.__litchi_names
        )

    if "reference_gps" in self.synchronized_data and hasattr(
        self, "_Synchronizer__ref_names"
    ):
        self.synchronized_data["reference_gps"] = (
            self._standardize_coordinate_columns(
                self.synchronized_data["reference_gps"], self.__ref_names
            )
        )

    logger.info(f"({t_end - t_start:.2f}s duration)")

    return self.synchronized_data

get_offset_summary

get_offset_summary() -> str

Get summary of detected time offsets.

Returns:

Type Description
str

Formatted string with offset information

Source code in pils/synchronizer.py
def get_offset_summary(self) -> str:
    """
    Get summary of detected time offsets.

    Returns
    -------
    str
        Formatted string with offset information
    """
    if not self.offsets:
        return "No offsets detected. Run synchronize() first."

    lines = ["Correlation Synchronizer - Detected Time Offsets", "=" * 60]

    for source_name, offset_data in self.offsets.items():
        lines.append(f"\n{source_name.upper()}")

        # For inclinometer, show both relative offset and total offset
        if source_name == "inclinometer":
            incl_offset = offset_data["time_offset"]
            litchi_offset = self.offsets.get("litchi_gps", {}).get(
                "time_offset", 0.0
            )
            total_offset = incl_offset + litchi_offset
            lines.append(f"  Time Offset (relative to Litchi): {incl_offset:.3f} s")
            lines.append(
                f"  Time Offset (total, relative to GPS): {total_offset:.3f} s"
            )
        else:
            lines.append(f"  Time Offset: {offset_data['time_offset']:.3f} s")

        lines.append(f"  Correlation: {offset_data['correlation']:.3f}")

        if "spatial_offset_m" in offset_data:
            lines.append(
                f"  Spatial Offset: {offset_data['spatial_offset_m']:.2f} m"
            )
            lines.append(f"    East: {offset_data['east_offset_m']:.2f} m")
            lines.append(f"    North: {offset_data['north_offset_m']:.2f} m")
            lines.append(f"    Up: {offset_data['up_offset_m']:.2f} m")

    return "\n".join(lines)