from machine import UART, Pin, I2C import time import os # ------------------------------ # MPU6050 (GY-521) Accelerometer Setup # ------------------------------ MPU6050_ADDR = 0x68 PWR_MGMT_1 = 0x6B ACCEL_XOUT_H = 0x3B GYRO_XOUT_H = 0x43 # Hardware I2C for MPU6050 (SDA=32, SCL=33) i2c_mpu = I2C(0, scl=Pin(33), sda=Pin(32), freq=400000) i2c_mpu.writeto_mem(MPU6050_ADDR, PWR_MGMT_1, b'\x00') # wake sensor # ------------------------------ # MPU6050 Functions # ------------------------------ def read_raw(reg): high = i2c_mpu.readfrom_mem(MPU6050_ADDR, reg, 1)[0] low = i2c_mpu.readfrom_mem(MPU6050_ADDR, reg+1, 1)[0] val = (high << 8) | low if val > 32767: val -= 65536 return val def read_accel(): ax = read_raw(ACCEL_XOUT_H) / 16384.0 ay = read_raw(ACCEL_XOUT_H+2) / 16384.0 az = read_raw(ACCEL_XOUT_H+4) / 16384.0 return ax, ay, az # ------------------------------ # GPS Tracker Class # ------------------------------ class CarGPSTracker: def __init__(self): # GPS UART (corrected pins: TX=2, RX=4) self.uart = UART(2, baudrate=9600, tx=Pin(2), rx=Pin(4)) # GPS state self.current_lat = 0.0 self.current_lon = 0.0 self.current_alt = 0.0 self.fix_valid = False self.satellites = 0 # Recording state self.track_points = [] self.last_recorded_lat = 0.0 self.last_recorded_lon = 0.0 self.last_point_time = 0 self.is_recording = False self.session_start_time = 0 # Smart filtering self.min_movement_meters = 5.0 self.min_time_between_points = 3000 self.max_time_between_points = 30000 # File management self.csv_file = self.generate_unique_filename() with open(self.csv_file, 'w') as f: f.write("id,latitude,longitude,elevation,time,elapsed_seconds,satellites,movement_meters,speed_mps,speed_kmh," "ax,ay,az,acc_magnitude,qc_flag,reason\n") self.initialize_gps() print("Car GPS Tracker initialized. CSV: {}".format(self.csv_file)) # ------------------------------ # GPS Initialization # ------------------------------ def initialize_gps(self): commands = [ b'$PMTK314,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0*28\r\n', b'$PMTK220,1000*1F\r\n' ] for cmd in commands: self.uart.write(cmd) time.sleep_ms(100) def generate_unique_filename(self, prefix="car_route"): timestamp = time.localtime() # Format with leading zeros manually month = timestamp[1] if timestamp[1] >= 10 else "0{}".format(timestamp[1]) day = timestamp[2] if timestamp[2] >= 10 else "0{}".format(timestamp[2]) hour = timestamp[3] if timestamp[3] >= 10 else "0{}".format(timestamp[3]) minute = timestamp[4] if timestamp[4] >= 10 else "0{}".format(timestamp[4]) second = timestamp[5] if timestamp[5] >= 10 else "0{}".format(timestamp[5]) base_name = "{}_{}{}{}_{}{}{}".format(prefix, timestamp[0], month, day, hour, minute, second) filename = "{}.csv".format(base_name) counter = 1 try: existing_files = os.listdir() except: existing_files = [] while filename in existing_files: filename = "{}_{}.csv".format(base_name, counter) counter += 1 return filename # ------------------------------ # Coordinate Parsing # ------------------------------ def parse_coordinate(self, coord_str, direction): if not coord_str or not direction: return 0.0 try: if direction in ['N','S']: # Latitude ddmm.mmmm degrees = int(coord_str[:2]) minutes = float(coord_str[2:]) else: # Longitude dddmm.mmmm degrees = int(coord_str[:3]) minutes = float(coord_str[3:]) decimal = degrees + (minutes / 60.0) if direction in ['S','W']: decimal = -decimal return round(decimal, 6) # 6 decimals enough for 10cm precision except: return 0.0 def parse_gga(self, sentence): parts = sentence.split(',') if len(parts) >= 15: try: if parts[2] and parts[3]: self.current_lat = self.parse_coordinate(parts[2], parts[3]) if parts[4] and parts[5]: self.current_lon = self.parse_coordinate(parts[4], parts[5]) if parts[6]: self.fix_valid = int(parts[6]) > 0 if parts[7]: self.satellites = int(parts[7]) if parts[9]: self.current_alt = float(parts[9]) except: pass def read_gps(self): if self.uart.any(): try: line = self.uart.readline() if line: sentence = line.decode('utf-8','ignore').strip() if sentence.startswith('$GPGGA') or sentence.startswith('$GNGGA'): self.parse_gga(sentence) return True except: pass return False # ------------------------------ # Distance Calculation # ------------------------------ def calculate_distance(self, lat1, lon1, lat2, lon2): if lat1 == 0 or lon1 == 0 or lat2 == 0 or lon2 == 0: return 0 dlat = (lat2 - lat1) * 111000 dlon = (lon2 - lon1) * 111000 * 0.54 return (dlat*dlat + dlon*dlon) ** 0.5 # ------------------------------ # Smart Filtering # ------------------------------ def should_record_point(self, current_time): if not self.fix_valid or self.current_lat == 0 or self.current_lon == 0: return False, "No GPS fix" if not self.track_points: return True, "First point" time_diff = current_time - self.last_point_time if time_diff < self.min_time_between_points: return False, "Too soon" movement = self.calculate_distance( self.last_recorded_lat, self.last_recorded_lon, self.current_lat, self.current_lon ) if movement >= self.min_movement_meters or time_diff >= self.max_time_between_points: return True, "Moved {:.1f}m".format(movement) return False, "Only moved {:.1f}m".format(movement) # ------------------------------ # Timestamp # ------------------------------ def format_iso_time(self, elapsed_seconds): timestamp = self.session_start_time + int(elapsed_seconds) t = time.localtime(timestamp) return "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}Z".format(t[0], t[1], t[2], t[3], t[4], t[5]) # ------------------------------ # Add Track Point # ------------------------------ def add_track_point(self, elapsed_seconds, reason): ax, ay, az = read_accel() acc_mag = (ax**2 + ay**2 + az**2)**0.5 point = { 'id': len(self.track_points)+1, 'lat': self.current_lat, 'lon': self.current_lon, 'alt': self.current_alt, 'elapsed': elapsed_seconds, 'satellites': self.satellites, 'iso_time': self.format_iso_time(elapsed_seconds), 'ax': round(ax,3), 'ay': round(ay,3), 'az': round(az,3), 'acc_magnitude': round(acc_mag,3), 'reason': reason } # Speed calculation if self.track_points: last = self.track_points[-1] movement = self.calculate_distance(last['lat'], last['lon'], self.current_lat, self.current_lon) time_diff = elapsed_seconds - last['elapsed'] speed_mps = movement / time_diff if 0 200: speed_mps = 0 speed_kmh = 0 point['qc_flag'] = "Speed spike" elif acc_mag < 0.05 and speed_kmh>10: point['qc_flag'] = "GPS jump" else: point['qc_flag'] = "" point['movement_m'] = round(movement,1) point['speed_mps'] = round(speed_mps,2) point['speed_kmh'] = round(speed_kmh,1) else: point['movement_m'] = 0 point['speed_mps'] = 0 point['speed_kmh'] = 0 point['qc_flag'] = "" self.track_points.append(point) self.last_recorded_lat = self.current_lat self.last_recorded_lon = self.current_lon # Write immediately to CSV (using format instead of f-strings) with open(self.csv_file, 'a') as f: f.write("{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n".format( point['id'], point['lat'], point['lon'], point['alt'], point['iso_time'], point['elapsed'], point['satellites'], point['movement_m'], point['speed_mps'], point['speed_kmh'], point['ax'], point['ay'], point['az'], point['acc_magnitude'], point['qc_flag'], point['reason'])) f.flush() return point # ------------------------------ # Wait for GPS Fix # ------------------------------ def wait_for_gps_fix(self): print("Waiting for GPS fix...", end="") attempts = 0 while not self.fix_valid: self.read_gps() print(".", end="") time.sleep_ms(500) attempts += 1 if attempts > 120: print("\nGPS fix taking longer than expected") attempts = 0 print("\nGPS fix acquired! {} satellites".format(self.satellites)) print("Current position: {:.6f},{:.6f}".format(self.current_lat, self.current_lon)) # ------------------------------ # Start Recording # ------------------------------ def start_recording(self): print("STARTING GPS RECORDING") self.is_recording = True self.session_start_time = time.time() start_ms = time.ticks_ms() try: while self.is_recording: self.read_gps() current_ms = time.ticks_ms() elapsed_ms = time.ticks_diff(current_ms, start_ms) elapsed_s = elapsed_ms/1000.0 should_record, reason = self.should_record_point(current_ms) if should_record: pt = self.add_track_point(elapsed_s, reason) self.last_point_time = current_ms print("Point {:3d} | {:.6f},{:.6f} | Speed: {:4.1f} km/h | Sats: {} | QC: {} | {}".format( pt['id'], pt['lat'], pt['lon'], pt['speed_kmh'], pt['satellites'], pt['qc_flag'], reason)) time.sleep_ms(100) except KeyboardInterrupt: self.is_recording = False print("\nRecording stopped by user") # ------------------------------ # Test Accelerometer # ------------------------------ def test_accelerometer(self, duration=10): """Test accelerometer readings""" print("Testing GY-521 accelerometer for {} seconds...".format(duration)) start_time = time.time() while time.time() - start_time < duration: ax, ay, az = read_accel() acc_mag = (ax**2 + ay**2 + az**2)**0.5 print("Accel: X={:.3f}, Y={:.3f}, Z={:.3f}, Magnitude={:.3f}g".format(ax, ay, az, acc_mag)) time.sleep(1) # ------------------------------ # System Status # ------------------------------ def show_system_status(self): """Show current system status""" print("\n--- SYSTEM STATUS ---") print("GPS Fix: {}".format("YES" if self.fix_valid else "NO")) if self.fix_valid: print("Position: {:.6f}, {:.6f}".format(self.current_lat, self.current_lon)) print("Altitude: {:.1f}m".format(self.current_alt)) print("Satellites: {}".format(self.satellites)) # Test accelerometer try: ax, ay, az = read_accel() acc_mag = (ax**2 + ay**2 + az**2)**0.5 print("Accelerometer: X={:.3f}, Y={:.3f}, Z={:.3f} (Mag: {:.3f}g)".format(ax, ay, az, acc_mag)) print("GY-521 Status: OK") except Exception as e: print("GY-521 Status: ERROR - {}".format(e)) print("Data file: {}".format(self.csv_file)) print("Recorded points: {}".format(len(self.track_points))) print("Recording: {}".format("YES" if self.is_recording else "NO")) # ------------------------------ # Main # ------------------------------ def main(): print("=== GPS + GY-521 Tracker ===") print("Hardware setup:") print(" GPS: TX->Pin2, RX->Pin4, VCC->3.3V/5V, GND->GND") print(" GY-521: SDA->Pin32, SCL->Pin33, VCC->3.3V, GND->GND") print() try: # Initialize tracker tracker = CarGPSTracker() # Show system status tracker.show_system_status() # Test accelerometer first print("\nTesting accelerometer...") tracker.test_accelerometer(duration=5) # Wait for GPS fix tracker.wait_for_gps_fix() # Start recording tracker.start_recording() except Exception as e: print("Error: {}".format(e)) print("Check wiring and connections!") if __name__=="__main__": main()