add tests for px4 failsafe

This commit is contained in:
Sem van der Hoeven
2023-06-07 17:16:20 +02:00
parent 9b05ac0fa0
commit ebf4264ae3
2 changed files with 181 additions and 9 deletions

View File

@@ -102,17 +102,17 @@ private:
char current_control_mode = CONTROL_MODE_ATTITUDE; // start with attitude control
const float omega = 0.3; // angular speed of the POLAR trajectory
const float K = 0; // [m] gain that regulates the spiral pitch
const float omega = 0.3; // angular speed of the POLAR trajectory
const float K = 0; // [m] gain that regulates the spiral pitch
float rho_0 = 0; // initial rho of polar coordinate
float theta_0 = 0; // initial theta of polar coordinate
float p0_z = -0.0; // initial height
float rho_0 = 0; // initial rho of polar coordinate
float theta_0 = 0; // initial theta of polar coordinate
float p0_z = -0.0; // initial height
float des_x = 0.0, des_y = 0.0, des_z = p0_z; // desired position
float gamma = M_PI_4; // desired heading direction
float gamma = M_PI_4; // desired heading direction
float local_x = 0; // local position x
float local_y = 0; // local position y
float local_x = 0; // local position x
float local_y = 0; // local position y
bool failsafe_enabled = false;
@@ -130,6 +130,12 @@ private:
const std::shared_ptr<drone_services::srv::SetAttitude::Request> request,
const std::shared_ptr<drone_services::srv::SetAttitude::Response> response)
{
if (this->failsafe_enabled)
{
RCLCPP_INFO(this->get_logger(), "Failsafe enabled, ignoring attitude setpoint");
response->success = false;
return;
}
if (armed)
{
if (request->yaw == 0 && request->pitch == 0 && request->roll == 0 && request->thrust == 0)
@@ -179,6 +185,12 @@ private:
const std::shared_ptr<drone_services::srv::SetTrajectory::Request> request,
const std::shared_ptr<drone_services::srv::SetTrajectory::Response> response)
{
if (this->failsafe_enabled)
{
RCLCPP_INFO(this->get_logger(), "Failsafe enabled, ignoring trajectory setpoint");
response->success = false;
return;
}
if (!(request->control_mode == CONTROL_MODE_VELOCITY || request->control_mode == CONTROL_MODE_POSITION))
{
RCLCPP_INFO(this->get_logger(), "Got invalid trajectory control mode: %d", request->control_mode);
@@ -490,7 +502,7 @@ private:
/**
* @brief Callback function for receiving failsafe messages
*
*
* @param msg the message indicating that the failsafe was enabled
*/
void on_failsafe_receive(const drone_services::msg::FailsafeMsg::SharedPtr msg)

View File

@@ -0,0 +1,160 @@
import os
import sys
import unittest
import launch
import launch_ros
import launch_ros.actions
import launch_testing.actions
import pytest
import rclpy
from rclpy.node import Node
from drone_services.srv import EnableFailsafe
from drone_services.srv import SetTrajectory
from drone_services.srv import SetAttitude
from drone_services.srv import ArmDrone
@pytest.mark.rostest
def generate_test_description():
file_path = os.path.dirname(__file__)
px4_controller_node = Node(
package='px4_connection', executable='px4_controller')
return (
launch.LaunchDescription([
px4_controller_node,
launch_testing.actions.ReadyToTest(),
]),
{
'px4_controller_node': px4_controller_node,
}
)
class TestPx4Failsafe(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
@classmethod
def tearDownClass(cls):
rclpy.shutdown()
def setUp(self):
self.node = rclpy.create_node('test_px4_failsafe')
self.called_failsafe_service = False
self.called_trajectory_service = False
self.called_arm_service = False
def tearDown(self):
self.node.destroy_node()
def failsafe_service_callback(self,future):
self.called_failsafe_service = True
def trajectory_service_callback(self,future):
self.called_trajectory_service = True
self.assertFalse(future.result().success)
def attitude_service_callback(self,future):
self.called_attitude_service = True
self.assertFalse(future.result().success)
def arm_service_callback(self,future):
self.called_arm_service = True
self.assertFalse(future.result().success)
def test_failsafe_enabled_set_trajectory_returns_false(self, px4_controller_node, proc_output):
self.called_failsafe_service = False
failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe')
set_trajectory_client = self.node.create_client(SetTrajectory, '/drone/set_trajectory')
while not failsafe_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
while not set_trajectory_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
failsafe_request = EnableFailsafe.Request()
failsafe_request.message = "test"
set_trajectory_request = SetTrajectory.Request()
set_trajectory_request.control_mode = 2
set_trajectory_request.values = [1,1,1]
set_trajectory_request.yaw = 0
try:
while True:
rclpy.spin_once(self.node, timeout_sec=0.1)
if not self.called_failsafe_service:
failsafe_future = failsafe_client.call_async(failsafe_request)
failsafe_future.add_done_callback(self.failsafe_service_callback)
elif not self.called_trajectory_service:
trajectory_future = set_trajectory_client.call_async(set_trajectory_request)
trajectory_future.add_done_callback(self.trajectory_service_callback)
else:
break
finally:
self.node.destroy_client(failsafe_client)
self.node.destroy_client(set_trajectory_client)
def test_failsafe_enabled_set_attitude_returns_false(self, px4_controller_node, proc_output):
self.called_failsafe_service = False
failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe')
set_attitude_client = self.node.create_client(SetAttitude, '/drone/set_attitude')
while not failsafe_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
while not set_attitude_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
failsafe_request = EnableFailsafe.Request()
failsafe_request.message = "test"
set_attitude_request = SetAttitude.Request()
set_attitude_request.control_mode = 1
set_attitude_request.pitch = 1
set_attitude_request.yaw = 1
set_attitude_request.roll = 1
set_attitude_request.thrust = 0.5
try:
while True:
rclpy.spin_once(self.node, timeout_sec=0.1)
if not self.called_failsafe_service:
failsafe_future = failsafe_client.call_async(failsafe_request)
failsafe_future.add_done_callback(self.failsafe_service_callback)
elif not self.called_attitude_service:
attitude_future = set_attitude_client.call_async(set_attitude_request)
attitude_future.add_done_callback(self.attitude_service_callback)
else:
break
finally:
self.node.destroy_client(failsafe_client)
self.node.destroy_client(set_attitude_client)
def test_failsafe_enabled_arm_returns_false(self, px4_controller_node, proc_output):
self.called_failsafe_service = False
failsafe_client = self.node.create_client(EnableFailsafe, '/drone/enable_failsafe')
arm_client = self.node.create_client(ArmDrone, '/drone/arm')
while not failsafe_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
while not arm_client.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
failsafe_request = EnableFailsafe.Request()
failsafe_request.message = "test"
arm_request = ArmDrone.Request()
try:
while True:
rclpy.spin_once(self.node, timeout_sec=0.1)
if not self.called_failsafe_service:
failsafe_future = failsafe_client.call_async(failsafe_request)
failsafe_future.add_done_callback(self.failsafe_service_callback)
elif not self.called_arm_service:
arm_future = arm_client.call_async(arm_request)
arm_future.add_done_callback(self.arm_service_callback)
else:
break
finally
self.node.destroy_client(failsafe_client)
self.node.destroy_client(arm_client)