make node publish on status topic

This commit is contained in:
Sem van der Hoeven
2023-06-09 19:40:38 +02:00
parent 8cc120c10f
commit bcc14c9e0d

View File

@@ -9,8 +9,7 @@ import launch_ros.actions
import launch_testing.actions import launch_testing.actions
import pytest import pytest
import rclpy import rclpy
from rclpy import qos
from px4_msgs.msg import BatteryStatus
from drone_services.msg import FailsafeMsg from drone_services.msg import FailsafeMsg
from drone_services.msg import DroneStatus from drone_services.msg import DroneStatus
@@ -31,8 +30,6 @@ def generate_test_description():
package='drone_controls', executable='position_changer') package='drone_controls', executable='position_changer')
px4_controller_node = launch_ros.actions.Node( px4_controller_node = launch_ros.actions.Node(
package='px4_connection', executable='px4_controller') package='px4_connection', executable='px4_controller')
drone_status_node = launch_ros.actions.Node(
package='drone_status', executable='drone_status')
heartbeat_node = launch_ros.actions.Node( heartbeat_node = launch_ros.actions.Node(
package='px4_connection', executable='heartbeat') package='px4_connection', executable='heartbeat')
@@ -43,7 +40,6 @@ def generate_test_description():
camera_node, camera_node,
position_changer_node, position_changer_node,
px4_controller_node, px4_controller_node,
drone_status_node,
heartbeat_node, heartbeat_node,
launch_testing.actions.ReadyToTest(), launch_testing.actions.ReadyToTest(),
]), ]),
@@ -53,7 +49,6 @@ def generate_test_description():
'camera_node': camera_node, 'camera_node': camera_node,
'position_changer_node': position_changer_node, 'position_changer_node': position_changer_node,
'px4_controller_node': px4_controller_node, 'px4_controller_node': px4_controller_node,
'drone_status_node': drone_status_node,
'heartbeat_node': heartbeat_node 'heartbeat_node': heartbeat_node
} }
) )
@@ -86,19 +81,17 @@ class ApiListenerTest(unittest.TestCase):
self.node.get_logger().info("Received status callback " + str(msg)) self.node.get_logger().info("Received status callback " + str(msg))
def test_api_listener_battery(self, api_listener_node, proc_output): def test_api_listener_battery(self, api_listener_node, proc_output):
qos_profile = qos.QoSProfile(
reliability=qos.QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
durability=qos.QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL,
history=qos.QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
depth=1
)
battery_publisher = self.node.create_publisher(BatteryStatus, '/fmu/out/battery_status',qos_profile=qos_profile)
failsafe_subscriber = self.node.create_subscription(FailsafeMsg, '/drone/failsafe', self.failsafe_callback, 10) failsafe_subscriber = self.node.create_subscription(FailsafeMsg, '/drone/failsafe', self.failsafe_callback, 10)
drone_status_subscriber = self.node.create_subscription(DroneStatus, '/drone/status', self.status_callback, 10) drone_status_publisher = self.node.create_publisher(DroneStatus, '/drone/status', 10)
msg = BatteryStatus() msg = DroneStatus()
msg.remaining = 0.10 msg.battery_percentage = 10.0
msg.armed = True
msg. height = 10.0
msg.control_mode = "attitude"
msg.cpu_usage = 10.0
msg.route_setpoint = 0
msg.position = [0.0,0.0,0.0]
msg.velocity = [0.0,0.0,0.0]
time.sleep(5) # wait for nodes to start time.sleep(5) # wait for nodes to start
self.node.get_logger().info("Starting publishing") self.node.get_logger().info("Starting publishing")
@@ -108,10 +101,10 @@ class ApiListenerTest(unittest.TestCase):
while time.time() < end_time: while time.time() < end_time:
rclpy.spin_once(self.node, timeout_sec=0.1) rclpy.spin_once(self.node, timeout_sec=0.1)
# self.node.get_logger().info("publishing message") # self.node.get_logger().info("publishing message")
battery_publisher.publish(msg) drone_status_publisher.publish(msg)
if self.received_failsafe_callback: if self.received_failsafe_callback:
break break
self.assertTrue(self.received_failsafe_callback, "Failsafe was not enabled!") self.assertTrue(self.received_failsafe_callback, "Failsafe was not enabled!")
finally: finally:
self.node.destroy_subscription(failsafe_subscriber) self.node.destroy_subscription(failsafe_subscriber)
self.node.destroy_publisher(battery_publisher) self.node.destroy_publisher(drone_status_publisher)