From a740fa7c45ed3cb472af8a81541a4631619fb9f6 Mon Sep 17 00:00:00 2001 From: Sem van der Hoeven Date: Wed, 7 Jun 2023 23:19:26 +0200 Subject: [PATCH] add positiionchanger tests --- src/drone_controls/CMakeLists.txt | 24 +-- .../test/test_positionchanger.py | 156 ++++++++++++++++++ src/failsafe/test/test_failsafe.py | 4 +- .../test/test_failsafe_enabled.py | 16 +- 4 files changed, 186 insertions(+), 14 deletions(-) create mode 100644 src/drone_controls/test/test_positionchanger.py diff --git a/src/drone_controls/CMakeLists.txt b/src/drone_controls/CMakeLists.txt index 4eb03b6f..3093da56 100644 --- a/src/drone_controls/CMakeLists.txt +++ b/src/drone_controls/CMakeLists.txt @@ -2,11 +2,12 @@ cmake_minimum_required(VERSION 3.8) project(drone_controls) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) + add_compile_options(-Wall -Wextra -Wpedantic) endif() # find dependencies find_package(ament_cmake REQUIRED) + # uncomment the following section in order to fill in # further dependencies manually. # find_package( REQUIRED) @@ -25,15 +26,18 @@ install(TARGETS position_changer DESTINATION lib/${PROJECT_NAME}) if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - # the following line skips the linter which checks for copyrights - # comment the line when a copyright and license is added to all source files - set(ament_cmake_copyright_FOUND TRUE) - # the following line skips cpplint (only works in a git repo) - # comment the line when this package is in a git repo and when - # a copyright and license is added to all source files - set(ament_cmake_cpplint_FOUND TRUE) - ament_lint_auto_find_test_dependencies() + # find_package(ament_lint_auto REQUIRED) + + # the following line skips the linter which checks for copyrights + # comment the line when a copyright and license is added to all source files + # set(ament_cmake_copyright_FOUND TRUE) + # the following line skips cpplint (only works in a git repo) + # comment the line when this package is in a git repo and when + # a copyright and license is added to all source files + # set(ament_cmake_cpplint_FOUND TRUE) + # ament_lint_auto_find_test_dependencies() + find_package(launch_testing_ament_cmake REQUIRED) + add_launch_test(test/test_failsafe_enabled.py) endif() ament_package() diff --git a/src/drone_controls/test/test_positionchanger.py b/src/drone_controls/test/test_positionchanger.py new file mode 100644 index 00000000..19da73e8 --- /dev/null +++ b/src/drone_controls/test/test_positionchanger.py @@ -0,0 +1,156 @@ +import os +import sys +import unittest + +import launch +import launch_ros +import launch_ros.actions +import launch_testing.actions +import pytest +import rclpy +import time + +from drone_services.srv import MovePosition +from drone_services.msg import FailsafeMsg +from drone_services.msg import LidarReading + + +@pytest.mark.rostest +def generate_test_description(): + file_path = os.path.dirname(__file__) + #device under test + positionchanger_node = launch_ros.actions.Node( + package='drone_controls', executable='positionchanger') + failsafe_node = launch_ros.actions.Node( + package='failsafe',executable='failsafe') + + return ( + launch.LaunchDescription([ + positionchanger_node, + failsafe_node, + launch_testing.actions.ReadyToTest(), + ]), + { + 'positionchanger_node': positionchanger_node, + 'failsafe_node': failsafe_node, + } + ) + +class TestPositionChanger(unittest.TestCase): + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node('test_positionchanger') + self.called_positionchanger_service = False + self.received_failsafe_callback = False + self.failsafe_reason_timeout = "Waiting for LIDAR timed out! Check the LIDAR USB connection and consider restarting the drone." + self.failsafe_reason_nodata = "No healthy connection to LIDAR! Check the LIDAR USB cable and restart the drone." + + + def tearDown(self): + self.node.destroy_node() + + def failsafe_callback_nodata(self,msg): + self.assertTrue(msg.enabled, "Failsafe was not enabled!") + self.assertEqual(msg.msg,self.failsafe_reason_nodata, "Failsafe message was not correct!") + self.received_failsafe_callback = True + + def failsafe_callback_timeout(self,msg): + self.assertTrue(msg.enabled, "Failsafe was not enabled!") + self.assertEqual(msg.msg,self.failsafe_reason_timeout, "Failsafe message was not correct!") + self.received_failsafe_callback = True + + def move_position_callback(self,future): + self.assertFalse(future.result().success, "MovePosition service call was successful, but should have failed!") + self.called_positionchanger_service = True + + def test_positionchanger_no_lidar_data(self,positionchanger_node,heartbeat_node): + self.received_failsafe_callback = False + self.called_positionchanger_service = False + failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback_nodata,10) + move_position_client = self.node.create_client(MovePosition,'/drone/move_position') + while not move_position_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('move_position service not available, waiting again...') + + request = MovePosition.Request() + request.front_back = 1.0 + request.left_right = 1.0 + request.up_down = 1.0 + request.yaw = 1.0 + + end_time = time.time() + 10.0 + try: + while time.time() < end_time: + rclpy.spin_once(self.node) + if not self.called_positionchanger_service: + future = move_position_client.call_async(request) + future.add_done_callback(self.move_position_callback) + elif not self.received_failsafe_callback: + continue + else: + break + self.assertTrue(self.received_failsafe_callback, "Failsafe callback was not received!") + self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!") + finally: + self.node.destroy_client(move_position_client) + self.node.destroy_subscription(failsafe_subscriber) + + + + def test_positionchanger_lidar_stops(self,positionchanger_node,heartbeat_node): + self.received_failsafe_callback = False + self.called_positionchanger_service = False + failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback_timeout,10) + lidar_publisher = self.node.create_publisher(LidarReading,'/drone/object_detection',10) + move_position_client = self.node.create_client(MovePosition,'/drone/move_position') + + while not move_position_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('move_position service not available, waiting again...') + + request = MovePosition.Request() + request.front_back = 1.0 + request.left_right = 1.0 + request.up_down = 1.0 + request.yaw = 1.0 + + lidar_msg = LidarReading() + lidar_msg.sensor_1 = 2.0 + lidar_msg.sensor_2 = 2.0 + lidar_msg.sensor_3 = 2.0 + lidar_msg.sensor_4 = 2.0 + lidar_msg.imu_data = [1.0,1.0,1.0,1.0] + lidar_messages_published = 0 + + + # wait 5 seconds for the failsafe to trigger + wait_time = time.time() + 5.0 + end_time = time.time() + 10.0 + try: + while time.time() < end_time: + rclpy.spin_once(self.node) + # send 5 lidar messages and after that stop + if (lidar_messages_published < 5): + lidar_publisher.publish(lidar_msg) + lidar_messages_published += 1 + # wait 5 seconds before sending the move_position request + if not self.called_positionchanger_service and time.time() > wait_time: + future = move_position_client.call_async(request) + future.add_done_callback(self.move_position_callback) + elif not self.received_failsafe_callback: + continue + break + + self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!") + self.assertTrue(self.received_failsafe_callback, "Failsafe was not activated!") + + finally: + self.node.destroy_client(move_position_client) + self.node.destroy_subscription(failsafe_subscriber) + self.node.destroy_publisher(lidar_publisher) diff --git a/src/failsafe/test/test_failsafe.py b/src/failsafe/test/test_failsafe.py index 33ec673a..25c1b6b2 100644 --- a/src/failsafe/test/test_failsafe.py +++ b/src/failsafe/test/test_failsafe.py @@ -65,8 +65,10 @@ class FailsafeUnitTest(unittest.TestCase): request = EnableFailsafe.Request() request.message = "test" + end_time = time.time() + 10.0 + try: - while True: + while time.time() < end_time: rclpy.spin_once(self.node, timeout_sec=0.1) if (not self.service_called): future = failsafe_client.call_async(request) diff --git a/src/px4_connection/test/test_failsafe_enabled.py b/src/px4_connection/test/test_failsafe_enabled.py index 0251c822..11aeca3a 100644 --- a/src/px4_connection/test/test_failsafe_enabled.py +++ b/src/px4_connection/test/test_failsafe_enabled.py @@ -8,6 +8,7 @@ import launch_ros.actions import launch_testing.actions import pytest import rclpy +import time from drone_services.srv import EnableFailsafe from drone_services.srv import SetTrajectory @@ -86,8 +87,9 @@ class TestPx4Failsafe(unittest.TestCase): set_trajectory_request.values = [1.0,1.0,1.0] set_trajectory_request.yaw = 0.0 + end_time = time.time() + 10.0 try: - while True: + while time.time() < end_time: rclpy.spin_once(self.node, timeout_sec=0.1) if not self.called_failsafe_service: failsafe_future = failsafe_client.call_async(failsafe_request) @@ -97,6 +99,8 @@ class TestPx4Failsafe(unittest.TestCase): trajectory_future.add_done_callback(self.trajectory_service_callback) else: break + self.assertTrue(self.called_failsafe_service, "Failsafe service was not called") + self.assertTrue(self.called_trajectory_service, "Trajectory service was not called") finally: self.node.destroy_client(failsafe_client) @@ -119,8 +123,9 @@ class TestPx4Failsafe(unittest.TestCase): set_attitude_request.roll = 1.0 set_attitude_request.thrust = 0.5 + end_time = time.time() + 10.0 try: - while True: + while time.time() < end_time: rclpy.spin_once(self.node, timeout_sec=0.1) if not self.called_failsafe_service: failsafe_future = failsafe_client.call_async(failsafe_request) @@ -130,6 +135,8 @@ class TestPx4Failsafe(unittest.TestCase): attitude_future.add_done_callback(self.attitude_service_callback) else: break + self.assertTrue(self.called_failsafe_service, "Failsafe service was not called") + self.assertTrue(self.called_attitude_service, "Attitude service was not called") finally: self.node.destroy_client(failsafe_client) @@ -147,8 +154,9 @@ class TestPx4Failsafe(unittest.TestCase): failsafe_request.message = "test" arm_request = ArmDrone.Request() + end_time = time.time() + 10.0 try: - while True: + while time.time() < end_time: rclpy.spin_once(self.node, timeout_sec=0.1) if not self.called_failsafe_service: failsafe_future = failsafe_client.call_async(failsafe_request) @@ -158,6 +166,8 @@ class TestPx4Failsafe(unittest.TestCase): arm_future.add_done_callback(self.arm_service_callback) else: break + self.assertTrue(self.called_failsafe_service, "Failsafe service was not called") + self.assertTrue(self.called_arm_service, "Arm service was not called") finally: self.node.destroy_client(failsafe_client) self.node.destroy_client(arm_client) \ No newline at end of file