From ebf4264ae3295d8f9c57677dc88a8711b9adce1f Mon Sep 17 00:00:00 2001 From: Sem van der Hoeven Date: Wed, 7 Jun 2023 17:16:20 +0200 Subject: [PATCH] add tests for px4 failsafe --- src/px4_connection/src/px4_controller.cpp | 30 +++- .../test/test_failsafe_enabled.py | 160 ++++++++++++++++++ 2 files changed, 181 insertions(+), 9 deletions(-) diff --git a/src/px4_connection/src/px4_controller.cpp b/src/px4_connection/src/px4_controller.cpp index 28dfe2ac..7c0d8930 100644 --- a/src/px4_connection/src/px4_controller.cpp +++ b/src/px4_connection/src/px4_controller.cpp @@ -102,17 +102,17 @@ private: char current_control_mode = CONTROL_MODE_ATTITUDE; // start with attitude control - const float omega = 0.3; // angular speed of the POLAR trajectory - const float K = 0; // [m] gain that regulates the spiral pitch + const float omega = 0.3; // angular speed of the POLAR trajectory + const float K = 0; // [m] gain that regulates the spiral pitch - float rho_0 = 0; // initial rho of polar coordinate - float theta_0 = 0; // initial theta of polar coordinate - float p0_z = -0.0; // initial height + float rho_0 = 0; // initial rho of polar coordinate + float theta_0 = 0; // initial theta of polar coordinate + float p0_z = -0.0; // initial height float des_x = 0.0, des_y = 0.0, des_z = p0_z; // desired position - float gamma = M_PI_4; // desired heading direction + float gamma = M_PI_4; // desired heading direction - float local_x = 0; // local position x - float local_y = 0; // local position y + float local_x = 0; // local position x + float local_y = 0; // local position y bool failsafe_enabled = false; @@ -130,6 +130,12 @@ private: const std::shared_ptr request, const std::shared_ptr response) { + if (this->failsafe_enabled) + { + RCLCPP_INFO(this->get_logger(), "Failsafe enabled, ignoring attitude setpoint"); + response->success = false; + return; + } if (armed) { if (request->yaw == 0 && request->pitch == 0 && request->roll == 0 && request->thrust == 0) @@ -179,6 +185,12 @@ private: const std::shared_ptr request, const std::shared_ptr response) { + if (this->failsafe_enabled) + { + RCLCPP_INFO(this->get_logger(), "Failsafe enabled, ignoring trajectory setpoint"); + response->success = false; + return; + } if (!(request->control_mode == CONTROL_MODE_VELOCITY || request->control_mode == CONTROL_MODE_POSITION)) { RCLCPP_INFO(this->get_logger(), "Got invalid trajectory control mode: %d", request->control_mode); @@ -490,7 +502,7 @@ private: /** * @brief Callback function for receiving failsafe messages - * + * * @param msg the message indicating that the failsafe was enabled */ void on_failsafe_receive(const drone_services::msg::FailsafeMsg::SharedPtr msg) diff --git a/src/px4_connection/test/test_failsafe_enabled.py b/src/px4_connection/test/test_failsafe_enabled.py index e69de29b..aaee247c 100644 --- a/src/px4_connection/test/test_failsafe_enabled.py +++ b/src/px4_connection/test/test_failsafe_enabled.py @@ -0,0 +1,160 @@ +import os +import sys +import unittest + +import launch +import launch_ros +import launch_ros.actions +import launch_testing.actions +import pytest +import rclpy +from rclpy.node import Node + +from drone_services.srv import EnableFailsafe +from drone_services.srv import SetTrajectory +from drone_services.srv import SetAttitude +from drone_services.srv import ArmDrone + + +@pytest.mark.rostest +def generate_test_description(): + file_path = os.path.dirname(__file__) + px4_controller_node = Node( + package='px4_connection', executable='px4_controller') + + return ( + launch.LaunchDescription([ + px4_controller_node, + launch_testing.actions.ReadyToTest(), + ]), + { + 'px4_controller_node': px4_controller_node, + } + ) + + +class TestPx4Failsafe(unittest.TestCase): + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node('test_px4_failsafe') + self.called_failsafe_service = False + self.called_trajectory_service = False + self.called_arm_service = False + + def tearDown(self): + self.node.destroy_node() + + def failsafe_service_callback(self,future): + self.called_failsafe_service = True + + def trajectory_service_callback(self,future): + self.called_trajectory_service = True + self.assertFalse(future.result().success) + + def attitude_service_callback(self,future): + self.called_attitude_service = True + self.assertFalse(future.result().success) + + def arm_service_callback(self,future): + self.called_arm_service = True + self.assertFalse(future.result().success) + + def test_failsafe_enabled_set_trajectory_returns_false(self, px4_controller_node, proc_output): + self.called_failsafe_service = False + failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe') + set_trajectory_client = self.node.create_client(SetTrajectory, '/drone/set_trajectory') + while not failsafe_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + while not set_trajectory_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + failsafe_request = EnableFailsafe.Request() + failsafe_request.message = "test" + set_trajectory_request = SetTrajectory.Request() + set_trajectory_request.control_mode = 2 + set_trajectory_request.values = [1,1,1] + set_trajectory_request.yaw = 0 + + try: + while True: + rclpy.spin_once(self.node, timeout_sec=0.1) + if not self.called_failsafe_service: + failsafe_future = failsafe_client.call_async(failsafe_request) + failsafe_future.add_done_callback(self.failsafe_service_callback) + elif not self.called_trajectory_service: + trajectory_future = set_trajectory_client.call_async(set_trajectory_request) + trajectory_future.add_done_callback(self.trajectory_service_callback) + else: + break + + finally: + self.node.destroy_client(failsafe_client) + self.node.destroy_client(set_trajectory_client) + + + def test_failsafe_enabled_set_attitude_returns_false(self, px4_controller_node, proc_output): + self.called_failsafe_service = False + failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe') + set_attitude_client = self.node.create_client(SetAttitude, '/drone/set_attitude') + while not failsafe_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + while not set_attitude_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + failsafe_request = EnableFailsafe.Request() + failsafe_request.message = "test" + set_attitude_request = SetAttitude.Request() + set_attitude_request.control_mode = 1 + set_attitude_request.pitch = 1 + set_attitude_request.yaw = 1 + set_attitude_request.roll = 1 + set_attitude_request.thrust = 0.5 + + try: + while True: + rclpy.spin_once(self.node, timeout_sec=0.1) + if not self.called_failsafe_service: + failsafe_future = failsafe_client.call_async(failsafe_request) + failsafe_future.add_done_callback(self.failsafe_service_callback) + elif not self.called_attitude_service: + attitude_future = set_attitude_client.call_async(set_attitude_request) + attitude_future.add_done_callback(self.attitude_service_callback) + else: + break + + finally: + self.node.destroy_client(failsafe_client) + self.node.destroy_client(set_attitude_client) + + def test_failsafe_enabled_arm_returns_false(self, px4_controller_node, proc_output): + self.called_failsafe_service = False + failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe') + arm_client = self.node.create_client(ArmDrone, '/drone/arm') + while not failsafe_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + while not arm_client.wait_for_service(timeout_sec=1.0): + self.node.get_logger().info('service not available, waiting again...') + failsafe_request = EnableFailsafe.Request() + failsafe_request.message = "test" + arm_request = ArmDrone.Request() + + try: + while True: + rclpy.spin_once(self.node, timeout_sec=0.1) + if not self.called_failsafe_service: + failsafe_future = failsafe_client.call_async(failsafe_request) + failsafe_future.add_done_callback(self.failsafe_service_callback) + elif not self.called_arm_service: + arm_future = arm_client.call_async(arm_request) + arm_future.add_done_callback(self.arm_service_callback) + else: + break + finally + self.node.destroy_client(failsafe_client) + self.node.destroy_client(arm_client) \ No newline at end of file