"""Profiling lidar models for HyPlan.
This module currently implements NASA Langley's Aerosol Wind Profiler (AWP)
as a planning-oriented instrument model. The emphasis is on observation
geometry and stable-flight sampling constraints rather than Doppler retrieval
physics or atmospheric backscatter performance.
References
----------
Bedka, K., Marketon, J., Henderson, S., and Kavaya, M. (2024).
"AWP: NASA's Aerosol Wind Profiler Coherent Doppler Wind Lidar." In
Singh, U. N., Tzeremes, G., Refaat, T. F., and Ribes Pleguezuelo, P. (eds),
*Space-based Lidar Remote Sensing Techniques and Emerging Technologies*,
LIDAR 2023, Springer Aerospace Technology.
https://doi.org/10.1007/978-3-031-53618-2_3
Bedka, K. (2025). *3-D Lidar Wind Airborne Profiling Using A
Coherent-Detection Doppler Wind Lidar Designed For Space-Based Operation*.
Final Project Report for NOAA Broad Agency Announcement, "Measuring the
Atmospheric Wind Profile (3D Winds): Call for Studies and Field Measurement
Campaigns."
"""
from __future__ import annotations
import datetime as _dt
from typing import Iterable, Optional, Tuple
import geopandas as gpd
import numpy as np
import pandas as pd
import pymap3d.vincenty
from pint import Quantity
from shapely.geometry import LineString, Point
from ..exceptions import HyPlanTypeError, HyPlanValueError
from ..flight_line import FlightLine
from ..geometry import process_linestring, wrap_to_180, wrap_to_360
from ..terrain import generate_demfile, get_elevations, ray_terrain_intersection
from ..units import ureg
from ._base import Sensor
__all__ = [
"AerosolWindProfiler",
"flag_awp_stable_segments",
"awp_profile_locations_for_flight_line",
"awp_profile_locations_for_plan",
]
_LIGHT_SPEED_MPS = 299_792_458.0
def _as_quantity(value, unit: str, label: str) -> Quantity:
"""Normalize *value* to a quantity in *unit*."""
if isinstance(value, Quantity):
return value.to(unit)
if isinstance(value, (int, float)):
return ureg.Quantity(float(value), unit)
raise HyPlanTypeError(f"{label} must be numeric or a pint.Quantity, got {type(value)}")
[docs]
class AerosolWindProfiler(Sensor):
"""NASA Langley Aerosol Wind Profiler (AWP).
A coherent Doppler wind lidar that alternates between two line-of-sight
(LOS) directions to recover a vector wind profile, plus a nadir dwell.
The abstraction captures the planning geometry needed to answer questions
like:
- How far apart are valid vector retrievals along track?
- Where do the LOS footprints intersect the surface?
- How much straight-and-level flight is needed before vector retrievals
become valid?
All constructor parameters default to the airborne AWP configuration
described by Bedka et al. (2024) and Bedka (2025):
- wavelength: 2052.92 nm
- 30° off nadir
- dual LOS at ±45° azimuth from the aircraft nose
- 200 Hz pulse rate
- 500 MHz digitization rate
- 65528 samples per pulse
- common airborne mode of 3 s per LOS + 1 s nadir
- QC limits of 3° roll and 0.5 km altitude change per profile
References
----------
Bedka, K., Marketon, J., Henderson, S., and Kavaya, M. (2024).
"AWP: NASA's Aerosol Wind Profiler Coherent Doppler Wind Lidar." In
*Space-based Lidar Remote Sensing Techniques and Emerging Technologies*.
Springer. https://doi.org/10.1007/978-3-031-53618-2_3
Bedka, K. (2025). *3-D Lidar Wind Airborne Profiling Using A
Coherent-Detection Doppler Wind Lidar Designed For Space-Based Operation*.
NOAA BAA final project report.
"""
[docs]
def __init__(
self,
name: str = "Aerosol Wind Profiler",
*,
wavelength: Quantity = 2052.92 * ureg.nanometer,
off_nadir_angle: float = 30.0,
los_azimuths_relative: Iterable[float] = (-45.0, 45.0),
pulse_rate: Quantity = 200 * ureg.Hz,
digitization_rate: Quantity = 500e6 * ureg.Hz,
samples_per_pulse: int = 65528,
dwell_time_per_los: Quantity = 3 * ureg.second,
nadir_dwell_time: Quantity = 1 * ureg.second,
blind_zone: Quantity = 200 * ureg.meter,
max_roll_angle: Quantity = 3 * ureg.degree,
max_heading_change: Quantity = 3 * ureg.degree,
max_altitude_change: Quantity = 0.5 * ureg.kilometer,
nominal_airspeed: Quantity = 225 * ureg.meter / ureg.second,
):
super().__init__(name)
self.wavelength = _as_quantity(wavelength, "nanometer", "wavelength")
self.pulse_rate = _as_quantity(pulse_rate, "Hz", "pulse_rate")
self.digitization_rate = _as_quantity(digitization_rate, "Hz", "digitization_rate")
self.dwell_time_per_los = _as_quantity(dwell_time_per_los, "second", "dwell_time_per_los")
self.nadir_dwell_time = _as_quantity(nadir_dwell_time, "second", "nadir_dwell_time")
self.blind_zone = _as_quantity(blind_zone, "meter", "blind_zone")
self.max_roll_angle = _as_quantity(max_roll_angle, "degree", "max_roll_angle")
self.max_heading_change = _as_quantity(max_heading_change, "degree", "max_heading_change")
self.max_altitude_change = _as_quantity(max_altitude_change, "meter", "max_altitude_change")
self.nominal_airspeed = _as_quantity(nominal_airspeed, "meter / second", "nominal_airspeed")
if not (0.0 < float(off_nadir_angle) < 90.0):
raise HyPlanValueError("off_nadir_angle must be between 0 and 90 degrees")
self.off_nadir_angle = float(off_nadir_angle)
rel_az = tuple(float(v) for v in los_azimuths_relative)
if len(rel_az) != 2:
raise HyPlanValueError("los_azimuths_relative must contain exactly two azimuths")
if np.isclose((rel_az[1] - rel_az[0]) % 360.0, 0.0):
raise HyPlanValueError("The two LOS azimuths must not be identical")
self.los_azimuths_relative = rel_az
if self.pulse_rate.magnitude <= 0:
raise HyPlanValueError("pulse_rate must be positive")
if self.digitization_rate.magnitude <= 0:
raise HyPlanValueError("digitization_rate must be positive")
if samples_per_pulse <= 0:
raise HyPlanValueError("samples_per_pulse must be positive")
self.samples_per_pulse = int(samples_per_pulse)
for label, q in (
("dwell_time_per_los", self.dwell_time_per_los),
("nadir_dwell_time", self.nadir_dwell_time),
("blind_zone", self.blind_zone),
("max_roll_angle", self.max_roll_angle),
("max_heading_change", self.max_heading_change),
("max_altitude_change", self.max_altitude_change),
("nominal_airspeed", self.nominal_airspeed),
):
if q.magnitude <= 0:
raise HyPlanValueError(f"{label} must be positive")
@property
def sample_period(self) -> Quantity:
"""ADC sample period."""
return (1.0 / self.digitization_rate).to("second") # type: ignore[no-any-return]
[docs]
def max_slant_range(self) -> Quantity:
"""Maximum recorded LOS range from pulse digitization."""
distance_m = self.samples_per_pulse * _LIGHT_SPEED_MPS * self.sample_period.m_as("second") / 2.0
return distance_m * ureg.meter # type: ignore[no-any-return]
[docs]
def max_vertical_range(self) -> Quantity:
"""Maximum vertical profiling extent for the configured off-nadir view."""
return self.max_slant_range() * np.cos(np.radians(self.off_nadir_angle)) # type: ignore[return-value,no-any-return]
[docs]
def vertical_bin_spacing(self, fft_samples: int = 256) -> Quantity:
"""Vertical bin spacing implied by an FFT window length.
The range gate spans ``fft_samples`` digitized points. Converting the
two-way light-travel time to slant range and projecting by the
off-nadir angle reproduces the 66-132 m planning values cited for AWP
in Bedka et al. (2024) and the Bedka (2025) final report for 256-512
sample windows.
"""
if fft_samples <= 0:
raise HyPlanValueError("fft_samples must be positive")
slant_m = fft_samples * _LIGHT_SPEED_MPS * self.sample_period.m_as("second") / 2.0
return slant_m * np.cos(np.radians(self.off_nadir_angle)) * ureg.meter # type: ignore[no-any-return]
[docs]
def los_ground_distance(self, altitude_agl: Quantity) -> Quantity:
"""Ground distance from nadir to the LOS surface intercept."""
altitude_agl = _as_quantity(altitude_agl, "meter", "altitude_agl")
return altitude_agl * np.tan(np.radians(self.off_nadir_angle)) # type: ignore[return-value,no-any-return]
[docs]
def los_orientations(self, track_heading: float) -> Tuple[float, float]:
"""Return absolute LOS azimuths (degrees true) for a given track heading."""
abs_az = wrap_to_360(np.array(self.los_azimuths_relative) + float(track_heading))
return float(abs_az[0]), float(abs_az[1])
[docs]
def los_surface_separation(self, altitude_agl: Quantity) -> Quantity:
"""Surface separation between the two LOS intercepts.
For AWP's default ``±45°`` azimuth pair the included angle is ``90°``,
so at 12 km altitude and 30° off nadir this is about 9.8 km, matching
the ``~10 km`` separation described by Bedka (2025).
"""
radius = self.los_ground_distance(altitude_agl).m_as("meter")
az0, az1 = self.los_azimuths_relative
delta = np.radians(abs(((az1 - az0) + 180.0) % 360.0 - 180.0))
separation = np.sqrt(radius ** 2 + radius ** 2 - 2.0 * radius ** 2 * np.cos(delta))
return separation * ureg.meter # type: ignore[no-any-return]
[docs]
def los_ground_intercepts(
self,
latitude: float,
longitude: float,
track_heading: float,
altitude_agl: Quantity,
) -> dict[str, dict[str, float]]:
"""Ground intercepts of the two LOS beams for a platform location."""
radius_m = self.los_ground_distance(altitude_agl).m_as("meter")
azimuths = self.los_orientations(track_heading)
intercepts: dict[str, dict[str, float]] = {}
for idx, azimuth in enumerate(azimuths, start=1):
lat_i, lon_i = pymap3d.vincenty.vreckon(latitude, longitude, radius_m, azimuth)
intercepts[f"los{idx}"] = {
"latitude": float(lat_i),
"longitude": float(wrap_to_180(lon_i)),
"azimuth": float(azimuth),
}
return intercepts
[docs]
def vector_profile_time_spacing(
self,
*,
dwell_time_per_los: Quantity | None = None,
nadir_dwell_time: Quantity | None = None,
) -> Quantity:
"""Nominal time spacing between vector wind profiles.
This follows the AWP airborne concept of operations described by
Bedka et al. (2024) and Bedka (2025): a vector retrieval is assigned at
the LOS1→LOS2 switch, producing a spacing of
``2 * dwell_time_per_los + nadir_dwell_time``.
"""
dwell = _as_quantity(
dwell_time_per_los if dwell_time_per_los is not None else self.dwell_time_per_los,
"second",
"dwell_time_per_los",
)
nadir = _as_quantity(
nadir_dwell_time if nadir_dwell_time is not None else self.nadir_dwell_time,
"second",
"nadir_dwell_time",
)
return (2.0 * dwell + nadir).to("second") # type: ignore[no-any-return]
[docs]
def profile_assignment_offset(
self,
ground_speed: Quantity | None = None,
*,
dwell_time_per_los: Quantity | None = None,
nadir_dwell_time: Quantity | None = None,
) -> Quantity:
"""Distance from the start of a stable leg to the first vector profile."""
speed = _as_quantity(
ground_speed if ground_speed is not None else self.nominal_airspeed,
"meter / second",
"ground_speed",
)
dwell = _as_quantity(
dwell_time_per_los if dwell_time_per_los is not None else self.dwell_time_per_los,
"second",
"dwell_time_per_los",
)
nadir = _as_quantity(
nadir_dwell_time if nadir_dwell_time is not None else self.nadir_dwell_time,
"second",
"nadir_dwell_time",
)
return (speed * (dwell + nadir)).to("meter") # type: ignore[no-any-return]
[docs]
def vector_profile_spacing(
self,
ground_speed: Quantity | None = None,
*,
dwell_time_per_los: Quantity | None = None,
nadir_dwell_time: Quantity | None = None,
) -> Quantity:
"""Along-track spacing between adjacent vector profiles."""
speed = _as_quantity(
ground_speed if ground_speed is not None else self.nominal_airspeed,
"meter / second",
"ground_speed",
)
dt = self.vector_profile_time_spacing(
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
)
return (speed * dt).to("meter") # type: ignore[no-any-return]
[docs]
def is_stable_segment(
self,
*,
heading_change: Quantity | float = 0.0,
altitude_change: Quantity | float = 0.0,
roll_angle: Quantity | float = 0.0,
) -> bool:
"""Return ``True`` when a segment satisfies AWP stability heuristics."""
heading = _as_quantity(heading_change, "degree", "heading_change")
altitude = _as_quantity(altitude_change, "meter", "altitude_change")
roll = _as_quantity(roll_angle, "degree", "roll_angle")
return bool(
abs(heading.m_as("degree")) <= self.max_heading_change.m_as("degree")
and abs(altitude.m_as("meter")) <= self.max_altitude_change.m_as("meter")
and abs(roll.m_as("degree")) <= self.max_roll_angle.m_as("degree")
)
_PROFILE_COLUMNS = [
"sample_index",
"source_segment_index",
"source_segment_name",
"source_segment_type",
"distance_from_start_m",
"elapsed_time_s",
"time_utc",
"platform_lat",
"platform_lon",
"platform_heading_deg",
"terrain_elevation_m",
"altitude_agl_m",
"altitude_msl_ft",
"profile_spacing_m",
"profile_spacing_s",
"profile_assignment_offset_m",
"los_surface_separation_m",
"stable_platform_ok",
"los1_azimuth_deg",
"los1_lat",
"los1_lon",
"los1_alt_m",
"los2_azimuth_deg",
"los2_lat",
"los2_lon",
"los2_alt_m",
"geometry",
]
def _empty_profile_gdf() -> gpd.GeoDataFrame:
frame = pd.DataFrame(columns=_PROFILE_COLUMNS)
return gpd.GeoDataFrame(frame, geometry="geometry", crs="EPSG:4326")
def _as_awp(sensor: Optional[AerosolWindProfiler]) -> AerosolWindProfiler:
if sensor is None:
return AerosolWindProfiler()
if not isinstance(sensor, AerosolWindProfiler):
raise HyPlanTypeError(
"sensor must be an AerosolWindProfiler or None (for default AWP)"
)
return sensor
def _altitude_agl_or_default(value: Optional[Quantity], fallback: Quantity) -> Quantity:
if value is None:
return fallback.to("meter")
return _as_quantity(value, "meter", "altitude_agl")
def _interpolate_geodesic(linestring: LineString, distances_m: Iterable[float]) -> list[dict[str, float]]:
"""Interpolate positions and headings along a WGS84 LineString."""
lats, lons, azimuths, cumulative = process_linestring(linestring)
if len(cumulative) < 2:
lon0, lat0 = linestring.coords[0]
return [
{"latitude": float(lat0), "longitude": float(lon0), "heading": float(azimuths[0])}
for _ in distances_m
]
out: list[dict[str, float]] = []
max_distance = float(cumulative[-1])
for distance_m in distances_m:
d = min(max(float(distance_m), 0.0), max_distance)
seg_idx = int(np.searchsorted(cumulative, d, side="right") - 1)
seg_idx = max(0, min(seg_idx, len(cumulative) - 2))
seg_start = float(cumulative[seg_idx])
seg_heading = float(azimuths[seg_idx])
lat0 = float(lats[seg_idx])
lon0 = float(lons[seg_idx])
remain = d - seg_start
lat_i, lon_i = pymap3d.vincenty.vreckon(lat0, lon0, remain, seg_heading)
out.append(
{
"latitude": float(lat_i),
"longitude": float(wrap_to_180(lon_i)),
"heading": seg_heading,
}
)
return out
def _max_heading_change(linestring: LineString) -> float:
"""Maximum change from the initial heading across a segment geometry."""
_, _, azimuths, _ = process_linestring(linestring)
if len(azimuths) <= 1:
return 0.0
ref = float(azimuths[0])
delta = ((np.asarray(azimuths) - ref + 180.0) % 360.0) - 180.0
return float(np.max(np.abs(delta)))
def _line_length_m(linestring: LineString) -> float:
_, _, _, cumulative = process_linestring(linestring)
if len(cumulative) == 0:
return 0.0
return float(cumulative[-1])
def _segment_groundspeed(row: pd.Series) -> Optional[Quantity]:
groundspeed = row.get("groundspeed_kts")
if pd.notna(groundspeed) and float(groundspeed) > 0:
return ureg.Quantity(float(groundspeed), "knot")
distance = row.get("distance")
duration = row.get("time_to_segment")
if pd.notna(distance) and pd.notna(duration) and float(duration) > 0:
return ureg.Quantity(float(distance) / float(duration), "nautical_mile / minute")
return None
def _profile_sampling(
geometry: LineString,
sensor: AerosolWindProfiler,
ground_speed: Quantity,
dwell_time_per_los: Quantity | None,
nadir_dwell_time: Quantity | None,
) -> tuple[np.ndarray, list[dict[str, float]], float, float, float]:
"""Return profile sample distances, positions, and timing metadata."""
total_length_m = _line_length_m(geometry)
if total_length_m <= 0:
return np.array([], dtype=float), [], 0.0, 0.0, 0.0
spacing = sensor.vector_profile_spacing(
ground_speed,
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
)
offset = sensor.profile_assignment_offset(
ground_speed,
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
)
spacing_m = spacing.m_as("meter")
offset_m = offset.m_as("meter")
profile_spacing_s = sensor.vector_profile_time_spacing(
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
).m_as("second")
if offset_m > total_length_m:
return np.array([], dtype=float), [], spacing_m, offset_m, profile_spacing_s
sample_distances = np.arange(offset_m, total_length_m + 1e-9, spacing_m)
interpolated = _interpolate_geodesic(geometry, sample_distances)
return sample_distances, interpolated, spacing_m, offset_m, profile_spacing_s
def _resolve_platform_headings(
track_headings_deg: np.ndarray,
*,
crab_angle_deg: float | None = None,
heading_deg: float | None = None,
) -> np.ndarray:
"""Resolve aircraft headings from track headings and crab metadata."""
if heading_deg is not None:
return np.full_like(track_headings_deg, heading_deg % 360.0) # type: ignore[no-any-return]
if crab_angle_deg is not None:
return (track_headings_deg + crab_angle_deg) % 360.0 # type: ignore[no-any-return]
return track_headings_deg
def _terrain_dem_for_awp_profiles(
positions: list[dict[str, float]],
sensor: AerosolWindProfiler,
altitude_msl: Quantity,
*,
platform_headings_deg: np.ndarray | None = None,
) -> str:
"""Create a DEM covering the line and approximate LOS intercept envelope."""
altitude_guess = altitude_msl.to("meter")
dem_lats = [pos["latitude"] for pos in positions]
dem_lons = [pos["longitude"] for pos in positions]
if platform_headings_deg is None:
platform_headings_deg = np.asarray([pos["heading"] for pos in positions], dtype=float)
for pos, platform_heading in zip(positions, platform_headings_deg):
intercepts = sensor.los_ground_intercepts(
pos["latitude"],
pos["longitude"],
float(platform_heading),
altitude_guess,
)
dem_lats.extend([intercepts["los1"]["latitude"], intercepts["los2"]["latitude"]])
dem_lons.extend([intercepts["los1"]["longitude"], intercepts["los2"]["longitude"]])
return generate_demfile(np.asarray(dem_lats, dtype=float), np.asarray(dem_lons, dtype=float))
def _terrain_aware_profiles_for_geometry(
geometry: LineString,
*,
altitude_msl: Quantity,
altitude_msl_ft: float,
source_segment_index: int | None,
source_segment_name: str | None,
source_segment_type: str | None,
sensor: AerosolWindProfiler,
ground_speed: Quantity,
start_time: _dt.datetime | None,
dwell_time_per_los: Quantity | None,
nadir_dwell_time: Quantity | None,
dem_file: str | None,
terrain_precision: Quantity,
stable_platform_ok: bool,
crab_angle_deg: float | None = None,
heading_deg: float | None = None,
) -> gpd.GeoDataFrame:
sample_distances, interpolated, spacing_m, offset_m, profile_spacing_s = _profile_sampling(
geometry, sensor, ground_speed, dwell_time_per_los, nadir_dwell_time
)
if len(sample_distances) == 0:
return _empty_profile_gdf()
track_headings = np.asarray([pos["heading"] for pos in interpolated], dtype=float)
platform_headings = _resolve_platform_headings(
track_headings,
crab_angle_deg=crab_angle_deg,
heading_deg=heading_deg,
)
if dem_file is None:
dem_file = _terrain_dem_for_awp_profiles(
interpolated,
sensor,
altitude_msl,
platform_headings_deg=platform_headings,
)
platform_lats = np.asarray([pos["latitude"] for pos in interpolated], dtype=float)
platform_lons = np.asarray([pos["longitude"] for pos in interpolated], dtype=float)
terrain_elevation_m: np.ndarray = get_elevations(platform_lats, platform_lons, dem_file).astype(float)
altitude_msl_m = altitude_msl.m_as("meter")
altitude_agl_m = altitude_msl_m - terrain_elevation_m
if np.any(altitude_agl_m <= 0):
raise HyPlanValueError(
"terrain_aware=True requires platform altitude MSL to remain above terrain "
"along the profile locations."
)
los_azimuths = np.asarray(
[sensor.los_orientations(float(heading)) for heading in platform_headings],
dtype=float,
)
# ray_terrain_intersection expects tilt as off-nadir angle from vertical
# (0 deg = nadir, 90 deg = horizontal), which matches the AWP instrument model.
depression_angle = np.full(len(platform_lats), sensor.off_nadir_angle, dtype=float)
precision_m = terrain_precision.m_as("meter")
los1_lat, los1_lon, los1_alt = ray_terrain_intersection(
platform_lats,
platform_lons,
altitude_msl_m,
los_azimuths[:, 0],
depression_angle,
precision=precision_m,
dem_file=dem_file,
)
los2_lat, los2_lon, los2_alt = ray_terrain_intersection(
platform_lats,
platform_lons,
altitude_msl_m,
los_azimuths[:, 1],
depression_angle,
precision=precision_m,
dem_file=dem_file,
)
los_surface_separation_m = np.full(len(platform_lats), np.nan, dtype=float)
valid_pairs = (
np.isfinite(los1_lat)
& np.isfinite(los1_lon)
& np.isfinite(los2_lat)
& np.isfinite(los2_lon)
)
for idx in np.where(valid_pairs)[0]:
separation_m, _ = pymap3d.vincenty.vdist(
float(los1_lat[idx]),
float(los1_lon[idx]),
float(los2_lat[idx]),
float(los2_lon[idx]),
)
los_surface_separation_m[idx] = float(separation_m)
records = []
for sample_index, distance_m in enumerate(sample_distances, start=1):
elapsed_s = float(distance_m) / ground_speed.m_as("meter / second")
time_utc = start_time + _dt.timedelta(seconds=float(elapsed_s)) if start_time else pd.NaT
row = sample_index - 1
records.append(
{
"sample_index": sample_index,
"source_segment_index": source_segment_index,
"source_segment_name": source_segment_name,
"source_segment_type": source_segment_type,
"distance_from_start_m": float(distance_m),
"elapsed_time_s": float(elapsed_s),
"time_utc": time_utc,
"platform_lat": float(platform_lats[row]),
"platform_lon": float(platform_lons[row]),
"platform_heading_deg": float(platform_headings[row]),
"terrain_elevation_m": float(terrain_elevation_m[row]),
"altitude_agl_m": float(altitude_agl_m[row]),
"altitude_msl_ft": float(altitude_msl_ft),
"profile_spacing_m": float(spacing_m),
"profile_spacing_s": float(profile_spacing_s),
"profile_assignment_offset_m": float(offset_m),
"los_surface_separation_m": float(los_surface_separation_m[row]),
"stable_platform_ok": bool(stable_platform_ok),
"los1_azimuth_deg": float(los_azimuths[row, 0]),
"los1_lat": float(los1_lat[row]),
"los1_lon": float(wrap_to_180(los1_lon[row])) if np.isfinite(los1_lon[row]) else np.nan,
"los1_alt_m": float(los1_alt[row]) if np.isfinite(los1_alt[row]) else np.nan,
"los2_azimuth_deg": float(los_azimuths[row, 1]),
"los2_lat": float(los2_lat[row]),
"los2_lon": float(wrap_to_180(los2_lon[row])) if np.isfinite(los2_lon[row]) else np.nan,
"los2_alt_m": float(los2_alt[row]) if np.isfinite(los2_alt[row]) else np.nan,
"geometry": Point(float(platform_lons[row]), float(platform_lats[row])),
}
)
return gpd.GeoDataFrame(pd.DataFrame.from_records(records), geometry="geometry", crs="EPSG:4326")
def _terrain_aware_profiles_for_flight_line(
flight_line: FlightLine,
*,
sensor: AerosolWindProfiler,
ground_speed: Quantity,
start_time: _dt.datetime | None,
dwell_time_per_los: Quantity | None,
nadir_dwell_time: Quantity | None,
dem_file: str | None,
terrain_precision: Quantity,
) -> gpd.GeoDataFrame:
return _terrain_aware_profiles_for_geometry(
flight_line.geometry,
altitude_msl=flight_line.altitude_msl,
altitude_msl_ft=flight_line.altitude_msl.m_as("foot"),
source_segment_index=None,
source_segment_name=flight_line.site_name,
source_segment_type="flight_line",
sensor=sensor,
ground_speed=ground_speed,
start_time=start_time,
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
dem_file=dem_file,
terrain_precision=terrain_precision,
stable_platform_ok=True,
)
[docs]
def flag_awp_stable_segments(
plan: gpd.GeoDataFrame,
sensor: AerosolWindProfiler | None = None,
) -> gpd.GeoDataFrame:
"""Annotate a flight plan with AWP stability flags.
Bedka (2025) gives explicit QC limits for roll (3°) and
profile-to-profile altitude changes (0.5 km). HyPlan does not carry
per-sample roll, so this helper uses segment heading change as a planning
proxy for turns and unstable aircraft attitude.
"""
if not isinstance(plan, gpd.GeoDataFrame):
raise HyPlanTypeError("plan must be a GeoDataFrame")
awp = _as_awp(sensor)
flagged = plan.copy()
heading_changes: list[float] = []
altitude_changes_m: list[float] = []
stable_flags: list[bool] = []
always_unstable = {"takeoff", "climb", "descent", "approach", "loiter"}
for _, row in flagged.iterrows():
geom = row.geometry
if not isinstance(geom, LineString):
heading_change = 0.0
altitude_change_m = 0.0
stable = False
else:
heading_change = _max_heading_change(geom)
alt0 = row.get("start_altitude")
alt1 = row.get("end_altitude")
if pd.notna(alt0) and pd.notna(alt1):
altitude_change_m = abs(
ureg.Quantity(float(alt1) - float(alt0), "foot").m_as("meter")
)
else:
altitude_change_m = 0.0
stable = awp.is_stable_segment(
heading_change=heading_change * ureg.degree,
altitude_change=altitude_change_m * ureg.meter,
)
if row.get("segment_type") in always_unstable:
stable = False
heading_changes.append(float(heading_change))
altitude_changes_m.append(float(altitude_change_m))
stable_flags.append(bool(stable))
flagged["awp_heading_change_deg"] = heading_changes
flagged["awp_altitude_change_m"] = altitude_changes_m
flagged["awp_stable_platform_ok"] = stable_flags
return flagged
def _profiles_for_geometry(
geometry: LineString,
*,
altitude_agl: Quantity,
altitude_msl_ft: float,
source_segment_index: int | None,
source_segment_name: str | None,
source_segment_type: str | None,
sensor: AerosolWindProfiler,
ground_speed: Quantity,
start_time: _dt.datetime | None,
dwell_time_per_los: Quantity | None,
nadir_dwell_time: Quantity | None,
stable_platform_ok: bool,
crab_angle_deg: float | None = None,
heading_deg: float | None = None,
) -> gpd.GeoDataFrame:
sample_distances, interpolated, spacing_m, offset_m, profile_spacing_s = _profile_sampling(
geometry,
sensor,
ground_speed,
dwell_time_per_los,
nadir_dwell_time,
)
if len(sample_distances) == 0:
return _empty_profile_gdf()
track_headings = np.asarray([pos["heading"] for pos in interpolated], dtype=float)
platform_headings = _resolve_platform_headings(
track_headings,
crab_angle_deg=crab_angle_deg,
heading_deg=heading_deg,
)
records = []
for sample_index, (distance_m, pos, platform_heading) in enumerate(
zip(sample_distances, interpolated, platform_headings),
start=1,
):
intercepts = sensor.los_ground_intercepts(
pos["latitude"],
pos["longitude"],
float(platform_heading),
altitude_agl,
)
elapsed_s = distance_m / ground_speed.m_as("meter / second")
time_utc = start_time + _dt.timedelta(seconds=float(elapsed_s)) if start_time else pd.NaT
records.append(
{
"sample_index": sample_index,
"source_segment_index": source_segment_index,
"source_segment_name": source_segment_name,
"source_segment_type": source_segment_type,
"distance_from_start_m": float(distance_m),
"elapsed_time_s": float(elapsed_s),
"time_utc": time_utc,
"platform_lat": pos["latitude"],
"platform_lon": pos["longitude"],
"platform_heading_deg": float(platform_heading),
"terrain_elevation_m": np.nan,
"altitude_agl_m": altitude_agl.m_as("meter"),
"altitude_msl_ft": float(altitude_msl_ft),
"profile_spacing_m": spacing_m,
"profile_spacing_s": profile_spacing_s,
"profile_assignment_offset_m": offset_m,
"los_surface_separation_m": sensor.los_surface_separation(altitude_agl).m_as("meter"),
"stable_platform_ok": bool(stable_platform_ok),
"los1_azimuth_deg": intercepts["los1"]["azimuth"],
"los1_lat": intercepts["los1"]["latitude"],
"los1_lon": intercepts["los1"]["longitude"],
"los1_alt_m": np.nan,
"los2_azimuth_deg": intercepts["los2"]["azimuth"],
"los2_lat": intercepts["los2"]["latitude"],
"los2_lon": intercepts["los2"]["longitude"],
"los2_alt_m": np.nan,
"geometry": Point(pos["longitude"], pos["latitude"]),
}
)
return gpd.GeoDataFrame(pd.DataFrame.from_records(records), geometry="geometry", crs="EPSG:4326")
[docs]
def awp_profile_locations_for_flight_line(
flight_line: FlightLine,
*,
sensor: AerosolWindProfiler | None = None,
ground_speed: Quantity | None = None,
start_time: _dt.datetime | None = None,
altitude_agl: Quantity | None = None,
dwell_time_per_los: Quantity | None = None,
nadir_dwell_time: Quantity | None = None,
terrain_aware: bool = False,
dem_file: str | None = None,
terrain_precision: Quantity | float = 30.0,
) -> gpd.GeoDataFrame:
"""Predict AWP vector-profile sample locations along a single flight line.
The spacing and LOS geometry follow the AWP observing concept summarized by
Bedka et al. (2024) and Bedka (2025). When ``terrain_aware=True`` the
LOS intercepts are computed by ray-terrain intersection against a DEM and
the returned ``altitude_agl_m`` varies with local terrain beneath the line.
"""
if not isinstance(flight_line, FlightLine):
raise HyPlanTypeError("flight_line must be a FlightLine")
awp = _as_awp(sensor)
speed = _as_quantity(
ground_speed if ground_speed is not None else awp.nominal_airspeed,
"meter / second",
"ground_speed",
)
if terrain_aware and altitude_agl is not None:
raise HyPlanValueError(
"terrain_aware=True derives altitude AGL from flight_line.altitude_msl and DEM terrain; "
"do not also pass altitude_agl."
)
if terrain_aware:
return _terrain_aware_profiles_for_flight_line(
flight_line,
sensor=awp,
ground_speed=speed,
start_time=start_time,
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
dem_file=dem_file,
terrain_precision=_as_quantity(terrain_precision, "meter", "terrain_precision"),
)
altitude = _altitude_agl_or_default(altitude_agl, flight_line.altitude_msl)
return _profiles_for_geometry(
flight_line.geometry,
altitude_agl=altitude,
altitude_msl_ft=flight_line.altitude_msl.m_as("foot"),
source_segment_index=None,
source_segment_name=flight_line.site_name,
source_segment_type="flight_line",
sensor=awp,
ground_speed=speed,
start_time=start_time,
dwell_time_per_los=dwell_time_per_los,
nadir_dwell_time=nadir_dwell_time,
stable_platform_ok=True,
)
[docs]
def awp_profile_locations_for_plan(
plan: gpd.GeoDataFrame,
*,
sensor: AerosolWindProfiler | None = None,
takeoff_time: _dt.datetime | None = None,
dwell_time_per_los: Quantity | None = None,
nadir_dwell_time: Quantity | None = None,
stable_only: bool = True,
terrain_aware: bool = False,
dem_file: str | None = None,
terrain_precision: Quantity | float = 30.0,
wind_aware: bool = False,
) -> gpd.GeoDataFrame:
"""Predict AWP vector-profile locations along stable segments of a plan.
This combines the AWP LOS geometry from Bedka et al. (2024) with the
stable-flight planning heuristics summarized in Bedka (2025). When
``terrain_aware=True``, LOS intercepts are ray-traced into terrain using
a DEM. When ``wind_aware=True``, aircraft heading uses plan crab-angle
metadata (``crab_angle_deg`` or ``wind_corrected_heading``) when present,
so the LOS geometry follows the crabbed aircraft heading rather than the
nominal ground track.
"""
flagged = flag_awp_stable_segments(plan, sensor=sensor)
awp = _as_awp(sensor)
terrain_precision_q = _as_quantity(terrain_precision, "meter", "terrain_precision")
all_profiles: list[gpd.GeoDataFrame] = []
cumulative_seconds = 0.0
for idx, row in flagged.iterrows():
seg_duration_min = row.get("time_to_segment")
seg_duration_s = float(seg_duration_min) * 60.0 if pd.notna(seg_duration_min) else 0.0
geom = row.geometry
stable = bool(row.get("awp_stable_platform_ok", False))
speed = _segment_groundspeed(row)
if (
isinstance(geom, LineString)
and speed is not None
and speed.magnitude > 0
and (stable or not stable_only)
):
alt0 = row.get("start_altitude")
alt1 = row.get("end_altitude")
if pd.notna(alt0) and pd.notna(alt1):
alt_ft = (float(alt0) + float(alt1)) / 2.0
altitude_agl = ureg.Quantity(alt_ft, "foot").to("meter")
seg_start_time = (
takeoff_time + _dt.timedelta(seconds=cumulative_seconds)
if takeoff_time is not None
else None
)
crab_angle_deg = None
heading_deg = None
if wind_aware:
crab_angle = row.get("crab_angle_deg")
heading = row.get("wind_corrected_heading")
if pd.notna(crab_angle):
crab_angle_deg = float(crab_angle)
elif pd.notna(heading):
heading_deg = float(heading)
common_kwargs = {
"sensor": awp,
"ground_speed": speed.to("meter / second"),
"start_time": seg_start_time,
"dwell_time_per_los": dwell_time_per_los,
"nadir_dwell_time": nadir_dwell_time,
"stable_platform_ok": stable,
"crab_angle_deg": crab_angle_deg,
"heading_deg": heading_deg,
}
if terrain_aware:
gdf = _terrain_aware_profiles_for_geometry(
geom,
altitude_msl=ureg.Quantity(alt_ft, "foot"),
altitude_msl_ft=alt_ft,
source_segment_index=int(idx) if isinstance(idx, (int, np.integer)) else None,
source_segment_name=row.get("segment_name"),
source_segment_type=row.get("segment_type"),
dem_file=dem_file,
terrain_precision=terrain_precision_q,
**common_kwargs,
)
else:
gdf = _profiles_for_geometry(
geom,
altitude_agl=altitude_agl,
altitude_msl_ft=alt_ft,
source_segment_index=int(idx) if isinstance(idx, (int, np.integer)) else None,
source_segment_name=row.get("segment_name"),
source_segment_type=row.get("segment_type"),
**common_kwargs,
)
if not gdf.empty:
all_profiles.append(gdf)
cumulative_seconds += max(seg_duration_s, 0.0)
if not all_profiles:
return _empty_profile_gdf()
return gpd.GeoDataFrame(pd.concat(all_profiles, ignore_index=True), geometry="geometry", crs="EPSG:4326")