Adding Sensors¶
Extend PILS with new sensor types.
Overview¶
PILS uses a sensor registry pattern that makes adding new sensors straightforward:
- Create sensor class
- Register in sensor config
- Use via Flight interface
Sensor Class Template¶
Basic Structure¶
"""
New sensor module.
"""
from pathlib import Path
from typing import Optional
import polars as pl
class NewSensor:
"""
New sensor data handler.
Attributes:
file: Path to sensor data file
data: Loaded sensor data as Polars DataFrame
"""
def __init__(self, file: Path) -> None:
"""
Initialize sensor with file path.
Args:
file: Path to sensor data file
"""
self.file = Path(file)
self._data: Optional[pl.DataFrame] = None
@property
def data(self) -> pl.DataFrame:
"""
Get sensor data, loading if necessary.
Returns:
Sensor data as Polars DataFrame
"""
if self._data is None:
self._data = self.load_data()
return self._data
def load_data(self) -> pl.DataFrame:
"""
Load sensor data from file.
Returns:
Loaded DataFrame
Raises:
FileNotFoundError: If file doesn't exist
"""
if not self.file.exists():
raise FileNotFoundError(f"Sensor file not found: {self.file}")
# Load data (customize for your format)
df = pl.read_csv(self.file)
# Validate and transform
df = self._validate_schema(df)
df = self._transform_data(df)
return df
def _validate_schema(self, df: pl.DataFrame) -> pl.DataFrame:
"""Validate DataFrame schema."""
required_columns = ['timestamp', 'value']
for col in required_columns:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
return df
def _transform_data(self, df: pl.DataFrame) -> pl.DataFrame:
"""Apply data transformations."""
# Example: ensure correct types
df = df.with_columns([
pl.col('timestamp').cast(pl.Int64),
pl.col('value').cast(pl.Float64),
])
return df
Registration¶
Step 1: Create Sensor File¶
Create pils/sensors/new_sensor.py:
from pathlib import Path
from typing import Optional
import polars as pl
class Barometer:
"""Barometer sensor for pressure/altitude data."""
def __init__(self, file: Path) -> None:
self.file = Path(file)
self._data: Optional[pl.DataFrame] = None
@property
def data(self) -> pl.DataFrame:
if self._data is None:
self._data = self.load_data()
return self._data
def load_data(self) -> pl.DataFrame:
if not self.file.exists():
raise FileNotFoundError(f"File not found: {self.file}")
df = pl.read_csv(self.file)
# Convert pressure to altitude
df = df.with_columns([
((1 - (pl.col('pressure') / 1013.25) ** 0.190284) * 44330.77)
.alias('altitude_baro')
])
return df
Step 2: Register in Config¶
Update pils/sensors/sensors.py:
from pils.sensors.adc import ADC
from pils.sensors.gps import GPS
from pils.sensors.IMU import IMU
from pils.sensors.inclinometer import Inclinometer
from pils.sensors.barometer import Barometer # Add import
sensor_config = {
"gps": {
"class": GPS,
"load_method": "load_data",
},
"imu": {
"class": IMU,
"load_method": "load_all",
},
"adc": {
"class": ADC,
"load_method": "load_data",
},
"inclinometer": {
"class": Inclinometer,
"load_method": "load_data",
},
# Add new sensor
"barometer": {
"class": Barometer,
"load_method": "load_data",
},
}
Step 3: Use via Flight¶
from pils.flight import Flight
flight = Flight(flight_info={'flight_id': 'test'})
flight.add_sensor_data(['barometer'])
baro_data = flight['barometer'].data
print(baro_data.head())
Complete Example: Barometer¶
File: pils/sensors/barometer.py¶
"""
Barometer sensor module.
Handles pressure and barometric altitude data.
"""
from pathlib import Path
from typing import Optional
import polars as pl
class Barometer:
"""
Barometer sensor data handler.
Reads pressure data and computes barometric altitude using
the international standard atmosphere model.
Attributes:
file: Path to barometer data file
data: Loaded data as Polars DataFrame
Example:
>>> baro = Barometer(Path("baro.csv"))
>>> df = baro.data
>>> print(df.columns)
['timestamp', 'pressure', 'temperature', 'altitude_baro']
"""
# Standard atmosphere constants
P0 = 1013.25 # hPa (sea level pressure)
L = 0.0065 # K/m (lapse rate)
T0 = 288.15 # K (sea level temperature)
def __init__(self, file: Path) -> None:
"""
Initialize barometer with file path.
Args:
file: Path to CSV data file
"""
self.file = Path(file)
self._data: Optional[pl.DataFrame] = None
@property
def data(self) -> pl.DataFrame:
"""Get sensor data, loading if necessary."""
if self._data is None:
self._data = self.load_data()
return self._data
def load_data(self) -> pl.DataFrame:
"""
Load barometer data from CSV file.
Expected columns:
- timestamp: Unix microseconds
- pressure: Pressure in hPa
- temperature: Temperature in Celsius (optional)
Returns:
DataFrame with computed altitude
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If required columns missing
"""
if not self.file.exists():
raise FileNotFoundError(f"Barometer file not found: {self.file}")
df = pl.read_csv(self.file)
# Validate required columns
if 'pressure' not in df.columns:
raise ValueError("Missing required column: pressure")
if 'timestamp' not in df.columns:
raise ValueError("Missing required column: timestamp")
# Compute barometric altitude
df = df.with_columns([
pl.col('timestamp').cast(pl.Int64),
pl.col('pressure').cast(pl.Float64),
self._pressure_to_altitude(pl.col('pressure')).alias('altitude_baro'),
])
return df
@staticmethod
def _pressure_to_altitude(pressure: pl.Expr) -> pl.Expr:
"""
Convert pressure to altitude using ISA model.
h = (T0 / L) * (1 - (P / P0)^(R*L / g))
Simplified: h = 44330.77 * (1 - (P / P0)^0.190284)
"""
return ((1 - (pressure / Barometer.P0) ** 0.190284) * 44330.77)
def calibrate(self, ground_altitude: float) -> pl.DataFrame:
"""
Calibrate barometric altitude to known ground altitude.
Args:
ground_altitude: Known altitude at start (meters)
Returns:
DataFrame with calibrated altitude
"""
df = self.data
offset = df['altitude_baro'][0] - ground_altitude
return df.with_columns([
(pl.col('altitude_baro') - offset).alias('altitude_calibrated')
])
Tests: tests/test_barometer.py¶
"""Tests for barometer sensor."""
import pytest
import polars as pl
from pathlib import Path
from pils.sensors.barometer import Barometer
class TestBarometer:
"""Test suite for Barometer sensor."""
@pytest.fixture
def sample_csv(self, tmp_path: Path) -> Path:
"""Create sample barometer CSV."""
df = pl.DataFrame({
'timestamp': [1000000, 2000000, 3000000],
'pressure': [1013.25, 1010.0, 1005.0],
'temperature': [20.0, 19.5, 19.0],
})
csv_file = tmp_path / "baro.csv"
df.write_csv(csv_file)
return csv_file
def test_load_data(self, sample_csv: Path):
"""Test basic data loading."""
baro = Barometer(file=sample_csv)
df = baro.data
assert df.shape[0] == 3
assert 'pressure' in df.columns
assert 'altitude_baro' in df.columns
def test_altitude_calculation(self, sample_csv: Path):
"""Test altitude calculation from pressure."""
baro = Barometer(file=sample_csv)
df = baro.data
# At sea level pressure, altitude should be ~0
assert abs(df['altitude_baro'][0]) < 1.0
# Lower pressure = higher altitude
assert df['altitude_baro'][1] > df['altitude_baro'][0]
assert df['altitude_baro'][2] > df['altitude_baro'][1]
def test_calibration(self, sample_csv: Path):
"""Test altitude calibration."""
baro = Barometer(file=sample_csv)
calibrated = baro.calibrate(ground_altitude=100.0)
assert 'altitude_calibrated' in calibrated.columns
assert abs(calibrated['altitude_calibrated'][0] - 100.0) < 0.1
def test_missing_file_raises(self):
"""Test error for missing file."""
with pytest.raises(FileNotFoundError):
Barometer(file=Path("/nonexistent/baro.csv"))
def test_missing_pressure_raises(self, tmp_path: Path):
"""Test error for missing pressure column."""
csv_file = tmp_path / "bad.csv"
pl.DataFrame({'timestamp': [1000000]}).write_csv(csv_file)
with pytest.raises(ValueError, match="pressure"):
baro = Barometer(file=csv_file)
_ = baro.data
Binary Sensors¶
For binary format sensors, extend the template:
import struct
from pathlib import Path
from typing import Optional, List, Dict
import polars as pl
class BinarySensor:
"""Binary format sensor."""
HEADER = b'\xAA\x55'
RECORD_SIZE = 16
def __init__(self, file: Path) -> None:
self.file = Path(file)
self._data: Optional[pl.DataFrame] = None
@property
def data(self) -> pl.DataFrame:
if self._data is None:
self._data = self.load_data()
return self._data
def load_data(self) -> pl.DataFrame:
raw = self.file.read_bytes()
records = self._parse_binary(raw)
return pl.DataFrame(records)
def _parse_binary(self, data: bytes) -> List[Dict]:
records = []
offset = 0
while offset < len(data) - self.RECORD_SIZE:
# Find header
if data[offset:offset+2] != self.HEADER:
offset += 1
continue
record = self._parse_record(data[offset:offset+self.RECORD_SIZE])
records.append(record)
offset += self.RECORD_SIZE
return records
def _parse_record(self, data: bytes) -> Dict:
# Example: timestamp (4 bytes) + value (4 bytes float)
timestamp, value = struct.unpack('<If', data[2:10])
return {
'timestamp': timestamp,
'value': value,
}
Documentation Requirements¶
When adding a new sensor or analysis module, you MUST create documentation in two places:
1. API Documentation¶
Create a new file in docs/api/sensors/ that automatically extracts docstrings from your Python code using mkdocstrings.
Docstrings are Automatic
You do NOT manually copy docstrings into the API documentation. The ::: syntax automatically extracts them from your Python source code. Just reference the module path.
Example: docs/api/sensors/barometer.md
# Barometer Sensor API
::: pils.sensors.barometer.Barometer
options:
show_root_heading: true
show_source: true
heading_level: 2
Or for entire module:
# Barometer API
::: pils.sensors.barometer
options:
show_root_heading: true
show_source: false
heading_level: 2
The ::: directive tells mkdocstrings to:
1. Find the Python class/function
2. Extract its docstring
3. Render it as formatted documentation
4. Optionally show source code (show_source: true)
For Analysis Modules: docs/api/analysis/velocity.md
# Velocity Analysis API
::: pils.analyze.velocity.VelocityAnalysis
options:
show_root_heading: true
show_source: true
heading_level: 2
::: pils.analyze.velocity.VelocityVersion
options:
show_root_heading: true
show_source: false
heading_level: 2
Or entire module:
2. User Guide Documentation¶
Create a comprehensive user guide in docs/user-guide/ explaining:
- What the sensor/analysis does
- When to use it
- How to use it with examples
- Input/output data formats
- Configuration options
- Common use cases
Example: docs/user-guide/barometer.md
# Barometer Sensor
The barometer sensor measures atmospheric pressure and computes barometric altitude.
## Overview
Barometric altitude is computed using the International Standard Atmosphere (ISA) model:
$$
h = \frac{T_0}{L} \left(1 - \left(\frac{P}{P_0}\right)^{\frac{RL}{g}}\right)
$$
Where:
- $h$ = altitude (meters)
- $P$ = measured pressure (hPa)
- $P_0$ = sea level pressure (1013.25 hPa)
- $T_0$ = sea level temperature (288.15 K)
- $L$ = lapse rate (0.0065 K/m)
## When to Use
- **Altitude estimation**: When GPS altitude is noisy or unavailable
- **Vertical velocity**: For climb/descent rate calculations
- **Altitude calibration**: Cross-check with GPS altitude
- **Indoor positioning**: Works without GPS signal
## Quick Start
\```python
from pils.flight import Flight
# Load flight with barometer
flight = Flight.from_path('/path/to/flight')
flight.add_sensor_data(['barometer'])
# Access barometer data
baro_data = flight['barometer'].data
print(baro_data.head())
\```
## Data Schema
| Column | Type | Unit | Description |
|--------|------|------|-------------|
| `timestamp` | `Int64` | microseconds | Unix timestamp |
| `pressure` | `Float64` | hPa | Atmospheric pressure |
| `temperature` | `Float64` | °C | Temperature (optional) |
| `altitude_baro` | `Float64` | meters | Computed barometric altitude |
## Calibration
Calibrate to known ground altitude:
\```python
baro = flight['barometer']
ground_altitude = 150.0 # meters
# Calibrate
calibrated = baro.calibrate(ground_altitude)
print(calibrated['altitude_calibrated'])
\```
## Examples
### Altitude Comparison
Compare barometric altitude with GPS altitude:
\```python
import polars as pl
# Get both sensors
baro = flight['barometer'].data
gps = flight['gps'].data
# Merge on timestamp
merged = baro.join(gps, on='timestamp', how='inner')
# Compute difference
merged = merged.with_columns([
(pl.col('altitude_baro') - pl.col('altitude_gps')).alias('alt_diff')
])
print(f"Mean difference: {merged['alt_diff'].mean():.2f} m")
\```
### Vertical Velocity
Compute climb rate from barometric altitude:
\```python
baro = flight['barometer'].data
# Compute vertical velocity
baro = baro.with_columns([
pl.col('altitude_baro').diff().alias('dh'),
pl.col('timestamp').diff().alias('dt'),
])
baro = baro.with_columns([
(pl.col('dh') / pl.col('dt') * 1e6).alias('vertical_speed') # m/s
])
print(f"Max climb rate: {baro['vertical_speed'].max():.2f} m/s")
\```
## See Also
- [GPS Sensor](gps.md) - GPS altitude reference
- [Synchronization](synchronization.md) - Align barometer with GPS
- [API Reference](../api/sensors/barometer.md) - Detailed API docs
3. Update Navigation¶
Add your new documentation to mkdocs.yml:
nav:
- API Reference:
- Sensors:
- api/sensors/index.md
- GPS: api/sensors/gps.md
- IMU: api/sensors/imu.md
- Barometer: api/sensors/barometer.md # Add here
- User Guide:
- user-guide/index.md
- Sensors:
- Barometer: user-guide/barometer.md # Add here
Documentation is Mandatory
When adding a sensor or analysis module, you MUST create both API documentation and user guide documentation. This ensures the documentation stays synchronized with the codebase.
Required Files:
- API Doc: docs/api/sensors/your-sensor.md or docs/api/analysis/your-analysis.md
- User Guide: docs/user-guide/your-sensor.md or docs/user-guide/your-analysis.md
- Navigation: Update mkdocs.yml to include new pages
Checklist¶
When adding a new sensor:
- [ ] Create sensor class in
pils/sensors/ - [ ] Implement
__init__,dataproperty,load_data - [ ] Add type hints to all methods
- [ ] Write docstrings
- [ ] Register in
sensor_config - [ ] Write tests (TDD)
- [ ] Create API documentation (
docs/api/sensors/) - [ ] Create user guide (
docs/user-guide/) - [ ] Update
mkdocs.ymlnavigation - [ ] Update schema docs
See Also¶
- Sensors API - Existing sensors
- Testing - Test guidelines