from http.server import HTTPServer, BaseHTTPRequestHandler import os import json import dbus from urllib.parse import parse_qs from ac_maps import change_track, get_all_tracks, get_configs, get_preview_image, get_outline_image from ac_cars import get_all_cars, get_car_image class Handler(BaseHTTPRequestHandler): def do_GET(self): self.path = self.path.replace("%20", " ") # handle spaces in URLs if (self.handle_GET_path()): return self.send_error(404) def do_POST(self): if (self.handle_POST_path()): return self.send_error(404) def handle_GET_path(self) -> bool: """ handles the path entered in the GET request. Returns: true if the calling function should return, false otherwise """ # Serve preview images if self.path.startswith("/img/"): return self.handle_get_img_path() if self.path.startswith("/track/"): return self.handle_get_track_path() if self.path == "/cars": return self.handle_get_cars_path() if self.path == "/" or self.path == "/index.html": return self.handle_get_root_path() return False def handle_get_img_path(self): parts = self.path.split("/") parts = [p for p in parts if p] # remove empty img_type = parts[1] if len(parts) > 1 else "" img_path = "" if (img_type == "preview"): track, config = self.extract_track_and_config(parts) img_path = get_preview_image(track, config) elif (img_type == "outline"): track, config = self.extract_track_and_config(parts) img_path = get_outline_image(track, config) if img_path == "": img_path = get_preview_image(track, config) elif (img_type == "car"): car = parts[2] if len(parts) > 2 else "" skin = parts[3] if len(parts) > 3 else "" img_path = get_car_image(car, skin) else: self.send_error(404) return True self.send_image(img_path) return True def handle_get_track_path(self): track = self.path.replace("/track/", "").strip("/") configs = get_configs(track) data = { "track": track, "configs": configs, "image": f"/img/preview/{track}", "outline": f"/img/outline/{track}" } self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(json.dumps(data).encode()) return True def handle_get_cars_path(self): cars = get_all_cars() data = { "cars": cars } self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(json.dumps(data).encode()) return True def handle_get_root_path(self): with open("index.html", "r") as f: html = f.read() tracks = get_all_tracks() track_options = "".join([f'' for t in tracks]) html = html.replace("{{tracks}}", track_options) html = html.replace("{{configs}}", "") # empty at load self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(html.encode()) return True def handle_POST_path(self): if (self.path.startswith("/changetrack/")): parts = self.path.split("/") parts = [p for p in parts if p] # remove empty track, config = self.extract_track_and_config(parts, 1) print(f"Changing track to '{track}' with config '{config}'") success, message = change_track(track, config) if success: sysbus = dbus.SystemBus() systemd1 = sysbus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1') manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager') job = manager.RestartUnit('assetto-corsa-server.service', 'replace') if job: print("Successfully restarted assetto-corsa-server.service") else: print("Failed to restart assetto-corsa-server.service") success = False message = "Failed to restart assetto-corsa-server.service" print(message) if success: self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() response = {"status": "success", "message": message} self.wfile.write(json.dumps(response).encode()) return True else: self.send_response(400) self.send_header("Content-type", "application/json") self.end_headers() response = {"status": "error", "message": message} self.wfile.write(json.dumps(response).encode()) return True return False def send_image(self, img_path: str): if os.path.exists(img_path): self.send_response(200) self.send_header("Content-type", "image/png") self.end_headers() with open(img_path, "rb") as f: self.wfile.write(f.read()) else: self.send_error(404) def extract_track_and_config(self, parts: list, track_index: int = 2) -> tuple[str, str]: print(parts) track = parts[track_index] if len(parts) > track_index else "" config = parts[track_index + 1] if len(parts) > track_index + 1 else "" return track, config server = HTTPServer(("0.0.0.0", 10303), Handler) print("Server running on port 10303") server.serve_forever()