try with consumer

This commit is contained in:
Sem van der Hoeven
2023-05-22 16:36:15 +02:00
parent 9f2f47aac4
commit 5740e14fac

View File

@@ -35,35 +35,36 @@ class ApiListener(Node):
async def handle_api(self): async def handle_api(self):
self.get_logger().info('Starting API') self.get_logger().info('Starting API')
self.server = await websockets.serve(self.api_handler, '0.0.0.0', 9001) self.websocket = websockets.connect('ws://100.100.100.106:9001')
self.get_logger().info('API started') self.get_logger().info('API started')
await self.server.wait_closed() async for message in self.websocket:
self.get_logger().info(f"Received message: {message}")
await self.websocket.send(message)
async def handle_message_receive(self,websocket): # async def handle_message_receive(self,websocket):
self.get_logger().info(f"Received message: {self.last_message}") # self.last_message = await websocket.recv()
self.last_message = await websocket.recv()
def message_received_callback(self): # def message_received_callback(self):
self.get_logger().info(f"Received message callback: {self.last_message}") # self.get_logger().info(f"Received message callback: {self.last_message}")
self.message_queue.append(self.last_message) # self.message_queue.append(self.last_message)
self.checking_for_message = False # self.checking_for_message = False
# def handle_message(self, message): # def handle_message(self, message):
# deserialized_msg = json.loads(message) # deserialized_msg = json.loads(message)
async def api_handler(self, websocket): # async def api_handler(self, websocket):
try: # try:
while True: # while True:
if not self.checking_for_message: # if not self.checking_for_message:
self.get_logger().info('Waiting for message') # self.get_logger().info('Waiting for message')
self.checking_for_message = True # self.checking_for_message = True
task = asyncio.create_task(self.handle_message_receive(websocket)) # task = asyncio.create_task(self.handle_message_receive(websocket))
task.add_done_callback(self.message_received_callback) # task.add_done_callback(self.message_received_callback)
if len(self.message_queue) > 0: # if len(self.message_queue) > 0:
websocket.send(self.message_queue.pop(0)) # websocket.send(self.message_queue.pop(0))
except websockets.exceptions.ConnectionClosed: # except websockets.exceptions.ConnectionClosed:
self.get_logger().info('Connection closed') # self.get_logger().info('Connection closed')
def main(args=None): def main(args=None):