blob: 7ec79d066220c1affc37bb7255317299b764df28 [file] [log] [blame]
matianxing1992b95d1942025-01-28 15:04:02 -06001# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2#
3# Copyright (C) 2015-2025, The University of Memphis,
4# Arizona Board of Regents,
5# Regents of the University of California.
6#
7# This file is part of Mini-NDN.
8# See AUTHORS.md for a complete list of Mini-NDN authors and contributors.
9#
10# Mini-NDN is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# Mini-NDN is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with Mini-NDN, e.g., in COPYING.md file.
22# If not, see <http://www.gnu.org/licenses/>.
23
24import threading
25import math
26import time
27import datetime
28from typing import Optional, Tuple
29
30from minindn.apps.application import Application
31
32
33class Gpsd(Application):
34 """
35 A class that simulates a GPS device and provides GPS data in NMEA format.
36 This class can generate NMEA GGA and VTG sentences and feeds the GPS data to a GPSD server.
37
38 To use this class, you need to install gpsd and netcat (nc) for communication.
39 """
40
41 def __init__(self, node, lat: float = 0.0, lon: float = 0.0, altitude: float = 0.0, update_interval: float = 0.2) -> None:
42 """
43 Initialize the GPSd application for a node.
44
45 :param node: The node for which the GPS data will be provided.
46 :param lat: Latitude of the point (0, 0, 0).
47 :param lon: Longitude of the point (0, 0, 0).
48 :param altitude: Altitude of the point (0, 0, 0).
49 :param update_interval: The time interval in seconds to send gps data to gpsd, and it should be more than 0.2
50 """
51 Application.__init__(self, node)
52 self.lat = lat
53 self.lon = lon
54 self.altitude = altitude
55 self.update_interval = update_interval
56 self.stop_event = threading.Event()
57 self.location_thread = None
58
59
60 def calculate_coordinates(self, offset_lat: float, offset_lon: float, altitude_offset: float) -> Tuple[float, float, float]:
61 """
62 Calculate the coordinates of the target point.
63
64 :param offset_lat: Latitude offset (unit: meters)
65 :param offset_lon: Longitude offset (unit: meters)
66 :param altitude_offset: Altitude offset (unit: meters)
67 :return: New GPS coordinates (latitude, longitude, altitude)
68 """
69 # Each degree of latitude is approximately 111 kilometers
70 lat_offset_deg = offset_lat / 111000.0 # Convert to degrees
71 # Distance per degree of longitude changes based on latitude
72 lon_offset_deg = offset_lon / (111000.0 * math.cos(math.radians(self.lat))) # Convert to degrees
73 # Calculate the latitude, longitude, and altitude of the target point
74 lat2 = self.lat + lat_offset_deg
75 lon2 = self.lon + lon_offset_deg
76 alt2 = self.altitude + altitude_offset
77
78 return lat2, lon2, alt2
79
80 @staticmethod
81 def nmea_checksum(sentence: str) -> str:
82 """
83 Calculate the checksum for an NMEA sentence.
84 :param sentence: The NMEA sentence to calculate the checksum for.
85 :return: The checksum for the sentence.
86 """
87 checksum = 0
88 for char in sentence:
89 checksum ^= ord(char)
90 return f"{checksum:02X}"
91
92 @staticmethod
93 def generate_vtg_sentence(vx: float, vy: float) -> str:
94 """
95 Generate a NMEA VTG sentence based on a 2D velocity vector.
96
97 Parameters:
98 vx (float): Velocity in the x-direction (eastward, in m/s)
99 vy (float): Velocity in the y-direction (northward, in m/s)
100
101 Returns:
102 str: The corresponding NMEA VTG sentence
103 """
104 # Calculate ground speed (horizontal speed)
105 ground_speed = math.sqrt(vx**2 + vy**2) # Ground speed in m/s
106 ground_speed_knots = ground_speed * 1.94384 # Convert speed to knots
107 ground_speed_kmh = ground_speed * 3.6 # Convert speed to km/h
108 # Calculate heading angle (relative to true north, clockwise)
109 angle = math.atan2(vx, vy) # atan2(y, x)
110 true_course = (math.degrees(angle) + 360) % 360 # Normalize angle to 0° - 360°
111 # Create the VTG NMEA sentence
112 nmea_sentence = f"GPVTG,{true_course:.1f},T,,M,{ground_speed_knots:.2f},N,{ground_speed_kmh:.2f},K"
113
114 nmea_sentence_with_checksum = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}"
115
116 return nmea_sentence_with_checksum
117
118 @staticmethod
119 def generate_gga_sentence(lat: float, lon: float, altitude: float, utc_time: Optional[str] = None) -> str:
120 """
121 Convert latitude, longitude, and altitude into NMEA format data.
122
123 Parameters:
124 lat (float): Latitude (unit: degrees)
125 lon (float): Longitude (unit: degrees)
126 altitude (float): Altitude (unit: meters)
127 utc_time (str): UTC Time in 'hhmmss.sss' format, optional
128
129 Returns:
130 str: Formatted NMEA data sentence
131 """
132 lat_direction = 'N' if lat >= 0 else 'S'
133 lon_direction = 'E' if lon >= 0 else 'W'
134
135 lat_deg = int(abs(lat))
136 lat_min = (abs(lat) - lat_deg) * 60
137 lon_deg = int(abs(lon))
138 lon_min = (abs(lon) - lon_deg) * 60
139
140 if utc_time is None:
141 utc_time = datetime.datetime.now(datetime.timezone.utc).strftime("%H%M%S.%f")[:-3]# Generate UTC time
142
143 nmea_sentence = f"GPGGA,{utc_time},{lat_deg:02d}{lat_min:07.4f},{lat_direction},{lon_deg:03d}{lon_min:07.4f},{lon_direction},1,12,1.0,{altitude:.1f},M,0.0,M,,"
144
145 nmea_sentence = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}"
146
147 return nmea_sentence
148
149 @staticmethod
150 def generate_rmc_sentence(lat: float, lon: float, vx: float, vy: float, utc_time: Optional[str] = None, date: Optional[str] = None) -> str:
151 """
152 Generate an NMEA GPRMC sentence using vx and vy (speed components along X and Y axes).
153
154 Parameters:
155 lat (float): Latitude (degrees)
156 lon (float): Longitude (degrees)
157 vx (float): Speed along the X-axis (knots)
158 vy (float): Speed along the Y-axis (knots)
159 utc_time (str, optional): UTC time in 'hhmmss.sss' format
160 date (str, optional): UTC date in 'ddmmyy' format
161
162 Returns:
163 str: Formatted NMEA GPRMC sentence
164 """
165 # Calculate speed and course
166 speed = math.sqrt(vx**2 + vy**2) # Speed (ground speed)
167 course = math.degrees(math.atan2(vy, vx)) # Course (direction in degrees)
168
169 # Normalize course to be within 0 to 360 degrees
170 if course < 0:
171 course += 360
172
173 # Convert latitude and longitude to NMEA format
174 lat_direction = 'N' if lat >= 0 else 'S'
175 lon_direction = 'E' if lon >= 0 else 'W'
176
177 lat_deg = int(abs(lat))
178 lat_min = (abs(lat) - lat_deg) * 60
179 lon_deg = int(abs(lon))
180 lon_min = (abs(lon) - lon_deg) * 60
181
182 # Use current utc_time and date if not provided
183 if utc_time is None or date is None:
184 now = datetime.datetime.now(datetime.timezone.utc)
185 if utc_time is None:
186 utc_time = now.strftime("%H%M%S.%f")[:-3] # hhmmss.sss
187 if date is None:
188 date = now.strftime("%d%m%y") # ddmmyy
189
190 # Construct NMEA sentence
191 status = "A" # A = Active, V = Void (No fix)
192 nmea_sentence = f"GPRMC,{utc_time},{status},{lat_deg:02d}{lat_min:07.4f},{lat_direction},{lon_deg:03d}{lon_min:07.4f},{lon_direction},{speed:.1f},{course:.1f},{date},,,A"
193
194 nmea_sentence = f"${nmea_sentence}*{Gpsd.nmea_checksum(nmea_sentence)}"
195
196 return nmea_sentence
197
198 def __feedGPStoGPSD(self, node) -> None:
199 """
200 Continuously feed GPS data to the GPSD server.
201 """
202 current_position = node.position
203 current_time_seconds = time.monotonic()
204 while not self.stop_event.is_set():
205 time.sleep(self.update_interval)
206 lat, lon, altitude = self.calculate_coordinates(node.position[0], node.position[1], node.position[2])
207 gga_sentence = self.generate_gga_sentence(lat, lon, altitude)
208
209 tmp_position = node.position
210 tmp_time_seconds = time.monotonic()
211 vx = (tmp_position[0] - current_position[0]) / (tmp_time_seconds - current_time_seconds)
212 vy = (tmp_position[1] - current_position[1]) / (tmp_time_seconds - current_time_seconds)
213 current_position = tmp_position
214 current_time_seconds = tmp_time_seconds
215 rmc_sentence = self.generate_rmc_sentence(lat, lon, vx, vy)
216 vtg_sentence = self.generate_vtg_sentence(vx, vy)
217
218 cmd = f"echo '{gga_sentence}\n{rmc_sentence}\n{vtg_sentence}\n' | nc -u -w 1 127.0.0.1 7150"
219 process = node.popen(cmd, shell=True)
220
221 def start(self) -> None:
222 """
223 Start a thread to periodically send GPS data for the node.
224 """
225 Application.start(self, command="gpsd -n udp://127.0.0.1:7150")
226
227 self.location_thread = threading.Thread(target=self.__feedGPStoGPSD, args=(self.node,))
228 self.location_thread.start()
229
230 def stop(self) -> None:
231 """
232 Stop all the __feedGPStoGPSD threads
233 """
234 if not self.stop_event.is_set():
235 self.stop_event.set()
236 self.location_thread.join()
237 Application.stop(self)