From e706a38ea44669b50939b67a875ea2b3950f510d Mon Sep 17 00:00:00 2001 From: Sem van der Hoeven Date: Mon, 12 Jun 2023 11:31:22 +0200 Subject: [PATCH] add test for moving away with lidar --- .../test/test_positionchanger.py | 94 ++++++++++++++----- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/src/drone_controls/test/test_positionchanger.py b/src/drone_controls/test/test_positionchanger.py index 2f74156c..10f1320b 100644 --- a/src/drone_controls/test/test_positionchanger.py +++ b/src/drone_controls/test/test_positionchanger.py @@ -18,15 +18,15 @@ from drone_services.msg import LidarReading @pytest.mark.rostest def generate_test_description(): file_path = os.path.dirname(__file__) - #device under test + # device under test positionchanger_node = launch_ros.actions.Node( package='drone_controls', executable='position_changer') failsafe_node = launch_ros.actions.Node( - package='failsafe',executable='failsafe') + package='failsafe', executable='failsafe') px4_controller_node = launch_ros.actions.Node( - package='px4_connection',executable='px4_controller') + package='px4_connection', executable='px4_controller') heartbeat_node = launch_ros.actions.Node( - package='px4_connection',executable='heartbeat') + package='px4_connection', executable='heartbeat') return ( launch.LaunchDescription([ @@ -44,6 +44,7 @@ def generate_test_description(): } ) + class TestPositionChanger(unittest.TestCase): @classmethod @@ -57,25 +58,27 @@ class TestPositionChanger(unittest.TestCase): def setUp(self): self.node = rclpy.create_node('test_positionchanger') self.called_positionchanger_service = False - self.received_failsafe_callback = False - + self.received_failsafe_callback = False def tearDown(self): self.node.destroy_node() - def failsafe_callback(self,msg): + def failsafe_callback(self, msg): self.assertTrue(msg.enabled, "Failsafe was not enabled!") self.received_failsafe_callback = True - def move_position_callback(self,future): - self.assertFalse(future.result().success, "MovePosition service call was successful, but should have failed!") + def move_position_callback(self, future): + self.assertFalse(future.result( + ).success, "MovePosition service call was successful, but should have failed!") self.called_positionchanger_service = True - def test_positionchanger_no_lidar_data(self,positionchanger_node,proc_output): + def test_positionchanger_no_lidar_data(self, positionchanger_node, proc_output): self.received_failsafe_callback = False self.called_positionchanger_service = False - failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback,10) - move_position_client = self.node.create_client(MovePosition,'/drone/move_position') + failsafe_subscriber = self.node.create_subscription( + FailsafeMsg, '/drone/failsafe', self.failsafe_callback, 10) + move_position_client = self.node.create_client( + MovePosition, '/drone/move_position') while not move_position_client.wait_for_service(timeout_sec=1.0): self.node.get_logger().info('move_position service not available, waiting again...') @@ -96,22 +99,25 @@ class TestPositionChanger(unittest.TestCase): continue else: break - self.assertTrue(self.received_failsafe_callback, "Failsafe callback was not received!") - self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!") + self.assertTrue(self.received_failsafe_callback, + "Failsafe callback was not received!") + self.assertTrue(self.called_positionchanger_service, + "MovePosition service was not called!") finally: self.node.destroy_client(move_position_client) self.node.destroy_subscription(failsafe_subscriber) - - - def test_positionchanger_lidar_stops(self,positionchanger_node,proc_output): + def test_positionchanger_lidar_stops(self, positionchanger_node, proc_output): self.node.get_logger().info("STARTING TEST test_positionchanger_lidar_stops") self.received_failsafe_callback = False self.called_positionchanger_service = False - failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback,10) - lidar_publisher = self.node.create_publisher(LidarReading,'/drone/object_detection',10) - move_position_client = self.node.create_client(MovePosition,'/drone/move_position') - + failsafe_subscriber = self.node.create_subscription( + FailsafeMsg, '/drone/failsafe', self.failsafe_callback, 10) + lidar_publisher = self.node.create_publisher( + LidarReading, '/drone/object_detection', 10) + move_position_client = self.node.create_client( + MovePosition, '/drone/move_position') + while not move_position_client.wait_for_service(timeout_sec=1.0): self.node.get_logger().info('move_position service not available, waiting again...') @@ -126,19 +132,19 @@ class TestPositionChanger(unittest.TestCase): lidar_msg.sensor_2 = 2.0 lidar_msg.sensor_3 = 2.0 lidar_msg.sensor_4 = 2.0 - lidar_msg.imu_data = [1.0,1.0,1.0,1.0] + lidar_msg.imu_data = [1.0, 1.0, 1.0, 1.0] sent_lidar_msg = False # wait for nodes to become active time.sleep(3) - + # wait 5 seconds for the failsafe to trigger wait_time = time.time() + 5.0 end_time = time.time() + 10.0 try: self.node.get_logger().info('STARTING WHILE LOOP') while time.time() < end_time: - rclpy.spin_once(self.node,timeout_sec=0.1) + rclpy.spin_once(self.node, timeout_sec=0.1) if not sent_lidar_msg: lidar_publisher.publish(lidar_msg) sent_lidar_msg = True @@ -150,11 +156,47 @@ class TestPositionChanger(unittest.TestCase): elif not self.received_failsafe_callback: continue self.node.get_logger().info('END OF WHILE LOOP') - self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!") - self.assertTrue(self.received_failsafe_callback, "Failsafe was not activated!") + self.assertTrue(self.called_positionchanger_service, + "MovePosition service was not called!") + self.assertTrue(self.received_failsafe_callback, + "Failsafe was not activated!") finally: self.node.get_logger().info('Cleaning up') self.node.destroy_client(move_position_client) self.node.destroy_subscription(failsafe_subscriber) self.node.destroy_publisher(lidar_publisher) + + def test_positionchanger_lidar_moves_away(self, positionchanger_node, px4_controller_node, proc_output): + self.node.get_logger().info("STARTING TEST test_positionchanger_lidar_moves_away") + lidar_publisher = self.node.create_publisher( + LidarReading, '/drone/object_detection', 10) + move_position_client = self.node.create_client( + MovePosition, '/drone/move_position') + while not move_position_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('move_position service not available, waiting again...') + request = MovePosition.Request() + request.front_back = 1.0 + request.left_right = 0.0 + request.up_down = 0.0 + request.angle = 0.0 + + lidar_msg = LidarReading() + lidar_msg.sensor_1 = 0.5 + lidar_msg.sensor_2 = 2.0 + lidar_msg.sensor_3 = 2.0 + lidar_msg.sensor_4 = 2.0 + lidar_msg.imu_data = [1.0, 1.0, 1.0, 1.0] + end_time = time.time() + 10.0 + + try: + while time.time() < end_time: + rclpy.spin_once(self.node, timeout_sec=0.1) + lidar_publisher.publish(lidar_msg) + if not self.called_positionchanger_service: + future = move_position_client.call_async(request) + future.add_done_callback(self.move_position_callback) + proc_output.assertWaitFor(expected_output='0.5',process=px4_controller_node) + finally: + self.node.destroy_client(move_position_client) + self.node.destroy_publisher(lidar_publisher) \ No newline at end of file