matianxing1992 | b95d194 | 2025-01-28 15:04:02 -0600 | [diff] [blame^] | 1 | # -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */ |
| 2 | # |
| 3 | # Copyright (C) 2015-2025, The University of Memphis, |
| 4 | # Arizona Board of Regents, |
| 5 | # Regents of the University of California. |
| 6 | # |
| 7 | # This file is part of Mini-NDN. |
| 8 | # See AUTHORS.md for a complete list of Mini-NDN authors and contributors. |
| 9 | # |
| 10 | # Mini-NDN is free software: you can redistribute it and/or modify |
| 11 | # it under the terms of the GNU General Public License as published by |
| 12 | # the Free Software Foundation, either version 3 of the License, or |
| 13 | # (at your option) any later version. |
| 14 | # |
| 15 | # Mini-NDN is distributed in the hope that it will be useful, |
| 16 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 17 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 18 | # GNU General Public License for more details. |
| 19 | # |
| 20 | # You should have received a copy of the GNU General Public License |
| 21 | # along with Mini-NDN, e.g., in COPYING.md file. |
| 22 | # If not, see <http://www.gnu.org/licenses/>. |
| 23 | |
| 24 | import threading |
| 25 | import math |
| 26 | import time |
| 27 | import datetime |
| 28 | from typing import Optional, Tuple |
| 29 | |
| 30 | from minindn.apps.application import Application |
| 31 | |
| 32 | |
| 33 | class Gpsd(Application): |
| 34 | """ |
| 35 | A class that simulates a GPS device and provides GPS data in NMEA format. |
| 36 | This class can generate NMEA GGA and VTG sentences and feeds the GPS data to a GPSD server. |
| 37 | |
| 38 | To use this class, you need to install gpsd and netcat (nc) for communication. |
| 39 | """ |
| 40 | |
| 41 | def __init__(self, node, lat: float = 0.0, lon: float = 0.0, altitude: float = 0.0, update_interval: float = 0.2) -> None: |
| 42 | """ |
| 43 | Initialize the GPSd application for a node. |
| 44 | |
| 45 | :param node: The node for which the GPS data will be provided. |
| 46 | :param lat: Latitude of the point (0, 0, 0). |
| 47 | :param lon: Longitude of the point (0, 0, 0). |
| 48 | :param altitude: Altitude of the point (0, 0, 0). |
| 49 | :param update_interval: The time interval in seconds to send gps data to gpsd, and it should be more than 0.2 |
| 50 | """ |
| 51 | Application.__init__(self, node) |
| 52 | self.lat = lat |
| 53 | self.lon = lon |
| 54 | self.altitude = altitude |
| 55 | self.update_interval = update_interval |
| 56 | self.stop_event = threading.Event() |
| 57 | self.location_thread = None |
| 58 | |
| 59 | |
| 60 | def calculate_coordinates(self, offset_lat: float, offset_lon: float, altitude_offset: float) -> Tuple[float, float, float]: |
| 61 | """ |
| 62 | Calculate the coordinates of the target point. |
| 63 | |
| 64 | :param offset_lat: Latitude offset (unit: meters) |
| 65 | :param offset_lon: Longitude offset (unit: meters) |
| 66 | :param altitude_offset: Altitude offset (unit: meters) |
| 67 | :return: New GPS coordinates (latitude, longitude, altitude) |
| 68 | """ |
| 69 | # Each degree of latitude is approximately 111 kilometers |
| 70 | lat_offset_deg = offset_lat / 111000.0 # Convert to degrees |
| 71 | # Distance per degree of longitude changes based on latitude |
| 72 | lon_offset_deg = offset_lon / (111000.0 * math.cos(math.radians(self.lat))) # Convert to degrees |
| 73 | # Calculate the latitude, longitude, and altitude of the target point |
| 74 | lat2 = self.lat + lat_offset_deg |
| 75 | lon2 = self.lon + lon_offset_deg |
| 76 | alt2 = self.altitude + altitude_offset |
| 77 | |
| 78 | return lat2, lon2, alt2 |
| 79 | |
| 80 | @staticmethod |
| 81 | def nmea_checksum(sentence: str) -> str: |
| 82 | """ |
| 83 | Calculate the checksum for an NMEA sentence. |
| 84 | :param sentence: The NMEA sentence to calculate the checksum for. |
| 85 | :return: The checksum for the sentence. |
| 86 | """ |
| 87 | checksum = 0 |
| 88 | for char in sentence: |
| 89 | checksum ^= ord(char) |
| 90 | return f"{checksum:02X}" |
| 91 | |
| 92 | @staticmethod |
| 93 | def generate_vtg_sentence(vx: float, vy: float) -> str: |
| 94 | """ |
| 95 | Generate a NMEA VTG sentence based on a 2D velocity vector. |
| 96 | |
| 97 | Parameters: |
| 98 | vx (float): Velocity in the x-direction (eastward, in m/s) |
| 99 | vy (float): Velocity in the y-direction (northward, in m/s) |
| 100 | |
| 101 | Returns: |
| 102 | str: The corresponding NMEA VTG sentence |
| 103 | """ |
| 104 | # Calculate ground speed (horizontal speed) |
| 105 | ground_speed = math.sqrt(vx**2 + vy**2) # Ground speed in m/s |
| 106 | ground_speed_knots = ground_speed * 1.94384 # Convert speed to knots |
| 107 | ground_speed_kmh = ground_speed * 3.6 # Convert speed to km/h |
| 108 | # Calculate heading angle (relative to true north, clockwise) |
| 109 | angle = math.atan2(vx, vy) # atan2(y, x) |
| 110 | true_course = (math.degrees(angle) + 360) % 360 # Normalize angle to 0° - 360° |
| 111 | # Create the VTG NMEA sentence |
| 112 | nmea_sentence = f"GPVTG,{true_course:.1f},T,,M,{ground_speed_knots:.2f},N,{ground_speed_kmh:.2f},K" |
| 113 | |
| 114 | nmea_sentence_with_checksum = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}" |
| 115 | |
| 116 | return nmea_sentence_with_checksum |
| 117 | |
| 118 | @staticmethod |
| 119 | def generate_gga_sentence(lat: float, lon: float, altitude: float, utc_time: Optional[str] = None) -> str: |
| 120 | """ |
| 121 | Convert latitude, longitude, and altitude into NMEA format data. |
| 122 | |
| 123 | Parameters: |
| 124 | lat (float): Latitude (unit: degrees) |
| 125 | lon (float): Longitude (unit: degrees) |
| 126 | altitude (float): Altitude (unit: meters) |
| 127 | utc_time (str): UTC Time in 'hhmmss.sss' format, optional |
| 128 | |
| 129 | Returns: |
| 130 | str: Formatted NMEA data sentence |
| 131 | """ |
| 132 | lat_direction = 'N' if lat >= 0 else 'S' |
| 133 | lon_direction = 'E' if lon >= 0 else 'W' |
| 134 | |
| 135 | lat_deg = int(abs(lat)) |
| 136 | lat_min = (abs(lat) - lat_deg) * 60 |
| 137 | lon_deg = int(abs(lon)) |
| 138 | lon_min = (abs(lon) - lon_deg) * 60 |
| 139 | |
| 140 | if utc_time is None: |
| 141 | utc_time = datetime.datetime.now(datetime.timezone.utc).strftime("%H%M%S.%f")[:-3]# Generate UTC time |
| 142 | |
| 143 | nmea_sentence = f"GPGGA,{utc_time},{lat_deg:02d}{lat_min:07.4f},{lat_direction},{lon_deg:03d}{lon_min:07.4f},{lon_direction},1,12,1.0,{altitude:.1f},M,0.0,M,," |
| 144 | |
| 145 | nmea_sentence = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}" |
| 146 | |
| 147 | return nmea_sentence |
| 148 | |
| 149 | @staticmethod |
| 150 | def generate_rmc_sentence(lat: float, lon: float, vx: float, vy: float, utc_time: Optional[str] = None, date: Optional[str] = None) -> str: |
| 151 | """ |
| 152 | Generate an NMEA GPRMC sentence using vx and vy (speed components along X and Y axes). |
| 153 | |
| 154 | Parameters: |
| 155 | lat (float): Latitude (degrees) |
| 156 | lon (float): Longitude (degrees) |
| 157 | vx (float): Speed along the X-axis (knots) |
| 158 | vy (float): Speed along the Y-axis (knots) |
| 159 | utc_time (str, optional): UTC time in 'hhmmss.sss' format |
| 160 | date (str, optional): UTC date in 'ddmmyy' format |
| 161 | |
| 162 | Returns: |
| 163 | str: Formatted NMEA GPRMC sentence |
| 164 | """ |
| 165 | # Calculate speed and course |
| 166 | speed = math.sqrt(vx**2 + vy**2) # Speed (ground speed) |
| 167 | course = math.degrees(math.atan2(vy, vx)) # Course (direction in degrees) |
| 168 | |
| 169 | # Normalize course to be within 0 to 360 degrees |
| 170 | if course < 0: |
| 171 | course += 360 |
| 172 | |
| 173 | # Convert latitude and longitude to NMEA format |
| 174 | lat_direction = 'N' if lat >= 0 else 'S' |
| 175 | lon_direction = 'E' if lon >= 0 else 'W' |
| 176 | |
| 177 | lat_deg = int(abs(lat)) |
| 178 | lat_min = (abs(lat) - lat_deg) * 60 |
| 179 | lon_deg = int(abs(lon)) |
| 180 | lon_min = (abs(lon) - lon_deg) * 60 |
| 181 | |
| 182 | # Use current utc_time and date if not provided |
| 183 | if utc_time is None or date is None: |
| 184 | now = datetime.datetime.now(datetime.timezone.utc) |
| 185 | if utc_time is None: |
| 186 | utc_time = now.strftime("%H%M%S.%f")[:-3] # hhmmss.sss |
| 187 | if date is None: |
| 188 | date = now.strftime("%d%m%y") # ddmmyy |
| 189 | |
| 190 | # Construct NMEA sentence |
| 191 | status = "A" # A = Active, V = Void (No fix) |
| 192 | nmea_sentence = f"GPRMC,{utc_time},{status},{lat_deg:02d}{lat_min:07.4f},{lat_direction},{lon_deg:03d}{lon_min:07.4f},{lon_direction},{speed:.1f},{course:.1f},{date},,,A" |
| 193 | |
| 194 | nmea_sentence = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}" |
| 195 | |
| 196 | return nmea_sentence |
| 197 | |
| 198 | def __feedGPStoGPSD(self, node) -> None: |
| 199 | """ |
| 200 | Continuously feed GPS data to the GPSD server. |
| 201 | """ |
| 202 | current_position = node.position |
| 203 | current_time_seconds = time.monotonic() |
| 204 | while not self.stop_event.is_set(): |
| 205 | time.sleep(self.update_interval) |
| 206 | lat, lon, altitude = self.calculate_coordinates(node.position[0], node.position[1], node.position[2]) |
| 207 | gga_sentence = self.generate_gga_sentence(lat, lon, altitude) |
| 208 | |
| 209 | tmp_position = node.position |
| 210 | tmp_time_seconds = time.monotonic() |
| 211 | vx = (tmp_position[0] - current_position[0]) / (tmp_time_seconds - current_time_seconds) |
| 212 | vy = (tmp_position[1] - current_position[1]) / (tmp_time_seconds - current_time_seconds) |
| 213 | current_position = tmp_position |
| 214 | current_time_seconds = tmp_time_seconds |
| 215 | rmc_sentence = self.generate_rmc_sentence(lat, lon, vx, vy) |
| 216 | vtg_sentence = self.generate_vtg_sentence(vx, vy) |
| 217 | |
| 218 | cmd = f"echo '{gga_sentence}\n{rmc_sentence}\n{vtg_sentence}\n' | nc -u -w 1 127.0.0.1 7150" |
| 219 | process = node.popen(cmd, shell=True) |
| 220 | |
| 221 | def start(self) -> None: |
| 222 | """ |
| 223 | Start a thread to periodically send GPS data for the node. |
| 224 | """ |
| 225 | Application.start(self, command="gpsd -n udp://127.0.0.1:7150") |
| 226 | |
| 227 | self.location_thread = threading.Thread(target=self.__feedGPStoGPSD, args=(self.node,)) |
| 228 | self.location_thread.start() |
| 229 | |
| 230 | def stop(self) -> None: |
| 231 | """ |
| 232 | Stop all the __feedGPStoGPSD threads |
| 233 | """ |
| 234 | if not self.stop_event.is_set(): |
| 235 | self.stop_event.set() |
| 236 | self.location_thread.join() |
| 237 | Application.stop(self) |