add positiionchanger tests
This commit is contained in:
@@ -2,11 +2,12 @@ cmake_minimum_required(VERSION 3.8)
|
||||
project(drone_controls)
|
||||
|
||||
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
|
||||
add_compile_options(-Wall -Wextra -Wpedantic)
|
||||
add_compile_options(-Wall -Wextra -Wpedantic)
|
||||
endif()
|
||||
|
||||
# find dependencies
|
||||
find_package(ament_cmake REQUIRED)
|
||||
|
||||
# uncomment the following section in order to fill in
|
||||
# further dependencies manually.
|
||||
# find_package(<dependency> REQUIRED)
|
||||
@@ -25,15 +26,18 @@ install(TARGETS position_changer
|
||||
DESTINATION lib/${PROJECT_NAME})
|
||||
|
||||
if(BUILD_TESTING)
|
||||
find_package(ament_lint_auto REQUIRED)
|
||||
# the following line skips the linter which checks for copyrights
|
||||
# comment the line when a copyright and license is added to all source files
|
||||
set(ament_cmake_copyright_FOUND TRUE)
|
||||
# the following line skips cpplint (only works in a git repo)
|
||||
# comment the line when this package is in a git repo and when
|
||||
# a copyright and license is added to all source files
|
||||
set(ament_cmake_cpplint_FOUND TRUE)
|
||||
ament_lint_auto_find_test_dependencies()
|
||||
# find_package(ament_lint_auto REQUIRED)
|
||||
|
||||
# the following line skips the linter which checks for copyrights
|
||||
# comment the line when a copyright and license is added to all source files
|
||||
# set(ament_cmake_copyright_FOUND TRUE)
|
||||
# the following line skips cpplint (only works in a git repo)
|
||||
# comment the line when this package is in a git repo and when
|
||||
# a copyright and license is added to all source files
|
||||
# set(ament_cmake_cpplint_FOUND TRUE)
|
||||
# ament_lint_auto_find_test_dependencies()
|
||||
find_package(launch_testing_ament_cmake REQUIRED)
|
||||
add_launch_test(test/test_failsafe_enabled.py)
|
||||
endif()
|
||||
|
||||
ament_package()
|
||||
|
||||
156
src/drone_controls/test/test_positionchanger.py
Normal file
156
src/drone_controls/test/test_positionchanger.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import launch
|
||||
import launch_ros
|
||||
import launch_ros.actions
|
||||
import launch_testing.actions
|
||||
import pytest
|
||||
import rclpy
|
||||
import time
|
||||
|
||||
from drone_services.srv import MovePosition
|
||||
from drone_services.msg import FailsafeMsg
|
||||
from drone_services.msg import LidarReading
|
||||
|
||||
|
||||
@pytest.mark.rostest
|
||||
def generate_test_description():
|
||||
file_path = os.path.dirname(__file__)
|
||||
#device under test
|
||||
positionchanger_node = launch_ros.actions.Node(
|
||||
package='drone_controls', executable='positionchanger')
|
||||
failsafe_node = launch_ros.actions.Node(
|
||||
package='failsafe',executable='failsafe')
|
||||
|
||||
return (
|
||||
launch.LaunchDescription([
|
||||
positionchanger_node,
|
||||
failsafe_node,
|
||||
launch_testing.actions.ReadyToTest(),
|
||||
]),
|
||||
{
|
||||
'positionchanger_node': positionchanger_node,
|
||||
'failsafe_node': failsafe_node,
|
||||
}
|
||||
)
|
||||
|
||||
class TestPositionChanger(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
rclpy.init()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
rclpy.shutdown()
|
||||
|
||||
def setUp(self):
|
||||
self.node = rclpy.create_node('test_positionchanger')
|
||||
self.called_positionchanger_service = False
|
||||
self.received_failsafe_callback = False
|
||||
self.failsafe_reason_timeout = "Waiting for LIDAR timed out! Check the LIDAR USB connection and consider restarting the drone."
|
||||
self.failsafe_reason_nodata = "No healthy connection to LIDAR! Check the LIDAR USB cable and restart the drone."
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
self.node.destroy_node()
|
||||
|
||||
def failsafe_callback_nodata(self,msg):
|
||||
self.assertTrue(msg.enabled, "Failsafe was not enabled!")
|
||||
self.assertEqual(msg.msg,self.failsafe_reason_nodata, "Failsafe message was not correct!")
|
||||
self.received_failsafe_callback = True
|
||||
|
||||
def failsafe_callback_timeout(self,msg):
|
||||
self.assertTrue(msg.enabled, "Failsafe was not enabled!")
|
||||
self.assertEqual(msg.msg,self.failsafe_reason_timeout, "Failsafe message was not correct!")
|
||||
self.received_failsafe_callback = True
|
||||
|
||||
def move_position_callback(self,future):
|
||||
self.assertFalse(future.result().success, "MovePosition service call was successful, but should have failed!")
|
||||
self.called_positionchanger_service = True
|
||||
|
||||
def test_positionchanger_no_lidar_data(self,positionchanger_node,heartbeat_node):
|
||||
self.received_failsafe_callback = False
|
||||
self.called_positionchanger_service = False
|
||||
failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback_nodata,10)
|
||||
move_position_client = self.node.create_client(MovePosition,'/drone/move_position')
|
||||
while not move_position_client.wait_for_service(timeout_sec=1.0):
|
||||
self.node.get_logger().info('move_position service not available, waiting again...')
|
||||
|
||||
request = MovePosition.Request()
|
||||
request.front_back = 1.0
|
||||
request.left_right = 1.0
|
||||
request.up_down = 1.0
|
||||
request.yaw = 1.0
|
||||
|
||||
end_time = time.time() + 10.0
|
||||
try:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node)
|
||||
if not self.called_positionchanger_service:
|
||||
future = move_position_client.call_async(request)
|
||||
future.add_done_callback(self.move_position_callback)
|
||||
elif not self.received_failsafe_callback:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
self.assertTrue(self.received_failsafe_callback, "Failsafe callback was not received!")
|
||||
self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!")
|
||||
finally:
|
||||
self.node.destroy_client(move_position_client)
|
||||
self.node.destroy_subscription(failsafe_subscriber)
|
||||
|
||||
|
||||
|
||||
def test_positionchanger_lidar_stops(self,positionchanger_node,heartbeat_node):
|
||||
self.received_failsafe_callback = False
|
||||
self.called_positionchanger_service = False
|
||||
failsafe_subscriber = self.node.create_subscription(FailsafeMsg,'/drone/failsafe',self.failsafe_callback_timeout,10)
|
||||
lidar_publisher = self.node.create_publisher(LidarReading,'/drone/object_detection',10)
|
||||
move_position_client = self.node.create_client(MovePosition,'/drone/move_position')
|
||||
|
||||
while not move_position_client.wait_for_service(timeout_sec=1.0):
|
||||
self.node.get_logger().info('move_position service not available, waiting again...')
|
||||
|
||||
request = MovePosition.Request()
|
||||
request.front_back = 1.0
|
||||
request.left_right = 1.0
|
||||
request.up_down = 1.0
|
||||
request.yaw = 1.0
|
||||
|
||||
lidar_msg = LidarReading()
|
||||
lidar_msg.sensor_1 = 2.0
|
||||
lidar_msg.sensor_2 = 2.0
|
||||
lidar_msg.sensor_3 = 2.0
|
||||
lidar_msg.sensor_4 = 2.0
|
||||
lidar_msg.imu_data = [1.0,1.0,1.0,1.0]
|
||||
lidar_messages_published = 0
|
||||
|
||||
|
||||
# wait 5 seconds for the failsafe to trigger
|
||||
wait_time = time.time() + 5.0
|
||||
end_time = time.time() + 10.0
|
||||
try:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node)
|
||||
# send 5 lidar messages and after that stop
|
||||
if (lidar_messages_published < 5):
|
||||
lidar_publisher.publish(lidar_msg)
|
||||
lidar_messages_published += 1
|
||||
# wait 5 seconds before sending the move_position request
|
||||
if not self.called_positionchanger_service and time.time() > wait_time:
|
||||
future = move_position_client.call_async(request)
|
||||
future.add_done_callback(self.move_position_callback)
|
||||
elif not self.received_failsafe_callback:
|
||||
continue
|
||||
break
|
||||
|
||||
self.assertTrue(self.called_positionchanger_service, "MovePosition service was not called!")
|
||||
self.assertTrue(self.received_failsafe_callback, "Failsafe was not activated!")
|
||||
|
||||
finally:
|
||||
self.node.destroy_client(move_position_client)
|
||||
self.node.destroy_subscription(failsafe_subscriber)
|
||||
self.node.destroy_publisher(lidar_publisher)
|
||||
@@ -65,8 +65,10 @@ class FailsafeUnitTest(unittest.TestCase):
|
||||
request = EnableFailsafe.Request()
|
||||
request.message = "test"
|
||||
|
||||
end_time = time.time() + 10.0
|
||||
|
||||
try:
|
||||
while True:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
if (not self.service_called):
|
||||
future = failsafe_client.call_async(request)
|
||||
|
||||
@@ -8,6 +8,7 @@ import launch_ros.actions
|
||||
import launch_testing.actions
|
||||
import pytest
|
||||
import rclpy
|
||||
import time
|
||||
|
||||
from drone_services.srv import EnableFailsafe
|
||||
from drone_services.srv import SetTrajectory
|
||||
@@ -86,8 +87,9 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
set_trajectory_request.values = [1.0,1.0,1.0]
|
||||
set_trajectory_request.yaw = 0.0
|
||||
|
||||
end_time = time.time() + 10.0
|
||||
try:
|
||||
while True:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
if not self.called_failsafe_service:
|
||||
failsafe_future = failsafe_client.call_async(failsafe_request)
|
||||
@@ -97,6 +99,8 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
trajectory_future.add_done_callback(self.trajectory_service_callback)
|
||||
else:
|
||||
break
|
||||
self.assertTrue(self.called_failsafe_service, "Failsafe service was not called")
|
||||
self.assertTrue(self.called_trajectory_service, "Trajectory service was not called")
|
||||
|
||||
finally:
|
||||
self.node.destroy_client(failsafe_client)
|
||||
@@ -119,8 +123,9 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
set_attitude_request.roll = 1.0
|
||||
set_attitude_request.thrust = 0.5
|
||||
|
||||
end_time = time.time() + 10.0
|
||||
try:
|
||||
while True:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
if not self.called_failsafe_service:
|
||||
failsafe_future = failsafe_client.call_async(failsafe_request)
|
||||
@@ -130,6 +135,8 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
attitude_future.add_done_callback(self.attitude_service_callback)
|
||||
else:
|
||||
break
|
||||
self.assertTrue(self.called_failsafe_service, "Failsafe service was not called")
|
||||
self.assertTrue(self.called_attitude_service, "Attitude service was not called")
|
||||
|
||||
finally:
|
||||
self.node.destroy_client(failsafe_client)
|
||||
@@ -147,8 +154,9 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
failsafe_request.message = "test"
|
||||
arm_request = ArmDrone.Request()
|
||||
|
||||
end_time = time.time() + 10.0
|
||||
try:
|
||||
while True:
|
||||
while time.time() < end_time:
|
||||
rclpy.spin_once(self.node, timeout_sec=0.1)
|
||||
if not self.called_failsafe_service:
|
||||
failsafe_future = failsafe_client.call_async(failsafe_request)
|
||||
@@ -158,6 +166,8 @@ class TestPx4Failsafe(unittest.TestCase):
|
||||
arm_future.add_done_callback(self.arm_service_callback)
|
||||
else:
|
||||
break
|
||||
self.assertTrue(self.called_failsafe_service, "Failsafe service was not called")
|
||||
self.assertTrue(self.called_arm_service, "Arm service was not called")
|
||||
finally:
|
||||
self.node.destroy_client(failsafe_client)
|
||||
self.node.destroy_client(arm_client)
|
||||
Reference in New Issue
Block a user