diff --git a/src/api_communication/api_communication/api_listener.py b/src/api_communication/api_communication/api_listener.py index 2fb30280..4f3f5a8a 100644 --- a/src/api_communication/api_communication/api_listener.py +++ b/src/api_communication/api_communication/api_listener.py @@ -22,16 +22,9 @@ class ApiListener(Node): self.message_queue = [] self.checking_for_message = False - # self.server = None - # self.server_thread = threading.Thread(target=self.start_api_thread,daemon=True) - # self.server_thread.start() - - self.get_logger().info('Starting API') - self.websocket = websockets.connect('ws://100.100.100.106:9001') - self.get_logger().info('API started') - consumer_thread = threading.Thread(asyncio.run(self.consume_messages())) - consumer_thread.start() - + self.server = None + self.server_thread = threading.Thread(target=self.start_api_thread,daemon=True) + self.server_thread.start() def drone_status_callback(self, msg): self.last_battery_percentage = msg.battery_percentage @@ -39,47 +32,41 @@ class ApiListener(Node): def start_api_thread(self): asyncio.run(self.handle_api()) - - def on_message(self,message): - self.get_logger().info(f"Callback received message: {message}") - - async def consume_messages(self): - async for message in self.websocket: - self.get_logger().info(f"Consumer message: {message}") - self.on_message(message) - async def handle_api(self): self.get_logger().info('Starting API') - self.websocket = websockets.connect('ws://100.100.100.106:9001') + self.server = await websockets.serve(self.api_handler, '0.0.0.0', 9001) self.get_logger().info('API started') - consumer_thread = threading.Thread(asyncio.run(self.consume_messages())) - consumer_thread.start() + await self.server.wait_closed() - # async def handle_message_receive(self,websocket): - # self.last_message = await websocket.recv() + async def handle_message_receive(self,websocket): + self.get_logger().info(f"Received message callback: {self.last_message}") + self.last_message = await websocket.recv() - # def message_received_callback(self): - # self.get_logger().info(f"Received message callback: {self.last_message}") - # self.message_queue.append(self.last_message) - # self.checking_for_message = False + def message_received_callback(self): + self.get_logger().info(f"Received message callback: {self.last_message}") + self.message_queue.append(self.last_message) + self.checking_for_message = False # def handle_message(self, message): # deserialized_msg = json.loads(message) - # async def api_handler(self, websocket): - # try: - # while True: - # if not self.checking_for_message: - # self.get_logger().info('Waiting for message') - # self.checking_for_message = True - # task = asyncio.create_task(self.handle_message_receive(websocket)) - # task.add_done_callback(self.message_received_callback) - # if len(self.message_queue) > 0: - # websocket.send(self.message_queue.pop(0)) + async def api_handler(self, websocket): + try: + async for message in websocket: + self.get_logger().info(f"Received message: {message}") + websocket.send(message) + # while True: + # if not self.checking_for_message: + # self.get_logger().info('Waiting for message') + # self.checking_for_message = True + # task = asyncio.create_task(self.handle_message_receive(websocket)) + # task.add_done_callback(self.message_received_callback) + # if len(self.message_queue) > 0: + # websocket.send(self.message_queue.pop(0)) - # except websockets.exceptions.ConnectionClosed: - # self.get_logger().info('Connection closed') + except websockets.exceptions.ConnectionClosed: + self.get_logger().info('Connection closed') def main(args=None):