本科阶段最后一次竞赛Vlog——2024年智能车大赛智慧医疗组准备全过程——11上位机与小车交互
根据上一节的配置,目前已经建立了通讯环境
1.运动控制
1.1 键盘运动控制测试
回到命令行,可以看到,按下r,然后再按下wasd此时小车就可以移动了。按下p,此时按下wasd就不会动了
image-20241216114711597 image-20241216115052777image-20241216115133146!!!注意,此刻有人要问了,我按了r,但是为什么我的小车不动啊
!!!这个这个,我也,底盘没开!!!看似哈哈一个小问题,有时候流程太多了难免会忘记,教程分开写,其实每次都从头开始也不太好搞,所以就是你把我的所有文章看完,就会发现能串起来。
当你把底盘打开后,地图上就会出现小车的icon和右下角的odom话题接收的数据,我这里没有是因为,我目前没有车,只拿了X3演示
image-202412161414164621.2上位机命令控制
在比赛过程中,要求使用下面这两个按钮进行任务一停车和任务三的启动,当时我还很好奇这怎么能控制呢
image-20241216141459303直到有一天我发现,我点击了这个面板,左边显示了话题,原来是话题发布,那么我们车上只要写一个接收话题就知道了
image-20241216141633603所以开始编码!
经过对demo.json的配置进行读取就可以看出,发布的数据是5和6
"Publish!2ih9jm3": {
"buttonText": "C区进行遥测",
"buttonTooltip": "点击发布信号",
"advancedView": false,
"value": "{\n \"data\": 5\n}",
"topicName": "/sign4return",
"datatype": "std_msgs/msg/Int32",
"foxglovePanelTitle": "正在C区进行遥测"
},
"Publish!ibzz6v": {
"buttonText": "C区出口结束遥测",
"buttonTooltip": "点击发布信号",
"advancedView": false,
"value": "{\n \"data\": 6\n}",
"topicName": "/sign4return",
"datatype": "std_msgs/msg/Int32",
"foxglovePanelTitle": "C区出口结束遥测"
},
代码如下
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
class SignSubscriber(Node):
def __init__(self):
super().__init__('sign_subscriber') # 节点初始化
self.sign_sub = self.create_subscription(
Int32, # 消息类型
'sign4return', # 订阅的话题
self.sign_callback, # 回调函数
10 # 队列长度
)
self.keyboard_flag = 0 # 初始化标志
def sign_callback(self, data):
# 根据收到的消息内容执行不同的操作
if data.data == 6: # yaoce 结束
self.keyboard_flag = 2
self.get_logger().info('Received signal 6: Yaoce finished, keyboard_flag set to 2')
# 执行其他操作...
elif data.data == 5: # 开始遥控
self.keyboard_flag = 1
self.get_logger().info('Received signal 5: Remote control started, keyboard_flag set to 1')
# 执行其他操作...
else:
self.get_logger().info(f'Received unknown signal: {data.data}')
# 可以根据需要进行更多处理...
def main(args=None):
rclpy.init(args=args) # 初始化ROS2客户端库
node = SignSubscriber() # 创建并运行节点
rclpy.spin(node) # 持续运行节点
node.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭ROS2客户端库
if __name__ == '__main__':
main()
image-20241216143743283
image-20241216143814258
2.小车信号展示
2.1 二维码展示
点击小车面板可以看到,其实这个地方也是接收了一个话题
image-20241216142524985那么过对demo.json的配置进行读取就可以看出,发布的数据是3和4
代码如下
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from origincar_msg.msg import Sign
class SignPublisher(Node):
def __init__(self):
super().__init__('sign_publisher')
self.sign_switchpublisher = self.create_publisher(Sign,'/sign_switch',10)
# 发布初始消息
msg = Sign()
msg.sign_data = 0
self.publisher_.publish(msg)
# 创建一个定时器,每5秒调用一次 timer_callback
self.timer = self.create_timer(5, self.timer_callback)
def timer_callback(self):
msg = Sign()
'''
qrinf=这个地方换成你的判断顺逆代码
#这个地方写个判断顺时针逆时针,给result赋值3或者4
result=....
'''
msg.sign_data = result
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: {msg.sign_data}')
def main(args=None):
rclpy.init(args=args)
sign_publisher = SignPublisher()
rclpy.spin(sign_publisher)
sign_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
接下来搭配着识别二维码的代码就可以实现顺时针和逆时针啦
image-202412161424262492.2 图像展示
上位机展示图像是非常重要的一步,因为任务2遥控需要第二个选手脱离物理环境
看到群里大家交流时候,遇到好多问题,这里给大家进行一一解决
话不多说先给一般能用的低配代码,记住代码运行后,要点击右侧image面板切换话题,后面这步骤就不每个都演示了,此外除了2.2.3 方案别的就不演示结果截图了,为什么呢因为我现在没摄像头哈哈哈,展示的也是在本系列的第一个哔哩哔哩视频可以看到的,Vlog里面是有的。
image-20241216144419086
2.2.1 基础编码方案
比赛时候的USB摄像头原始接收到的是Mjpeg格式,但是foxglove并不支持,所以很多小伙伴发现面板话题那个地方有个红色感叹号,点击触摸后,发现提示信息 “不支持的编码xxx“,所以需要把jpeg改成支持的bgr8
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import cv2
import numpy as np
class ImageDecoder(Node):
def __init__(self):
super().__init__('image_decoder')
self.subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10)
self.publisher = self.create_publisher(Image, '/decoded_image', 10)
self.bridge = CvBridge()
def image_callback(self, msg):
# 检查图像数据是否为JPEG格式
if msg.encoding.lower() == 'jpeg':
# 尝试解码JPEG数据
try:
np_arr = np.frombuffer(msg.data, np.uint8)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# 将解码后的图像转换回Image消息并发布
image_message = self.bridge.cv2_to_imgmsg(cv_image, "bgr8")
self.publisher.publish(image_message)
except CvBridgeError as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
else:
# 如果不是JPEG数据,则直接发布原始消息
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = ImageDecoder()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
其实这种会让传输压力非常大,造成延迟非常高,所以这种其实用不了
2.2.2 MC的世界暴力压缩方案
接下来还有一种就是MC的世界,效果延迟在0.5秒左右,在实际使用其实也还行
除了画质不清楚,但是为了比赛,这作为没有办法的办法,对比任务二色彩差异大的地图其实够用,话不多说直接放代码
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import cv2
import numpy as np
class ImageCompressor(Node):
def __init__(self):
super().__init__('image_compressor')
self.subscription = self.create_subscription(
Image,
'/image',
self.image_callback,
10)
self.publisher = self.create_publisher(Image, '/compressed_image', 10)
self.bridge = CvBridge()
def image_callback(self, msg):
# 检查图像数据是否为JPEG格式
if msg.encoding.lower() == 'jpeg':
# 尝试解码JPEG数据
try:
np_arr = np.frombuffer(msg.data, np.uint8)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# 重新编码图像以降低质量,第二个参数是图像格式,第三个参数是质量参数
# 质量参数范围从0到100(高质量),这里设为50以降低图像质量
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 10]
result, encimg = cv2.imencode('.jpg', cv_image, encode_param)
if result:
# 将压缩后的图像转换回Image消息并发布
# 注意,由于图像已经是压缩过的JPEG格式,我们使用frombuffer来创建消息
compressed_img_msg = self.bridge.cv2_to_imgmsg(np.array(cv2.imdecode(encimg, 1)), "bgr8")
self.publisher.publish(compressed_img_msg)
except CvBridgeError as e:
self.get_logger().error('Failed to convert image: %s' % str(e))
else:
# 如果不是JPEG数据,则直接发布原始消息
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
image_compressor = ImageCompressor()
try:
rclpy.spin(image_compressor)
except KeyboardInterrupt:
pass
finally:
image_compressor.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2.2.3 零拷贝
零拷贝,因为前期没有太多时间,后期只是简单的运行起来了
但是明没有和其他任务结合共享图像数据,所以总的来说还是有继续优化空间,这里把命令给大家,大家有能力的可以,继续研究下,这个当时画质非常清楚,并且无延迟,可以堪称无损了哈哈,效果大家可能要去本系列的第一个vlog里面去哔哩哔哩看啦
ros2 launch hobot_usb_cam hobot_usb_cam.launch.py usb_video_device:=/dev/video8 usb_image_width:=640 usb_image_height:=360
ros2 launch hobot_codec hobot_codec_encode.launch.py codec_in_mode:=shared_mem codec_out_mode:=ros codec_in_format:=jpeg codec_out_format:=bgr8 codec_sub_topic:=/hbmem_image codec_pub_topic:=/image2 codec_jpg_quality:=10.0 codec_output_framerate:=15
2.2.4 分布式
对于分布式方案,稳定性还是很高的,其实主要是个思路,实现起来不难
首先在虚拟机配置和小车一个domain,然后虚拟机中订阅image话题,然后处理使用linux版的foxglove进行展示(Linux版本网盘也有),由于我目前没有环境没法给大家展示了,大家在这一步有问题的可以评论区dd我,我和大家一起解决
3.总结
对于foxglove与小车交互,目前到此也就这些了
其实最重要的就是画面的传输清晰度与稳定性,大多数小伙伴们比赛都受到了,网络拥塞问题导致画面延迟非常大,其实就两个办法,一个算法压缩,另外一个就是网络攻防战哈哈,这个可以选择不要一个频段,路由器都有2.4G 5G,这个大家去摸索下吧,会有惊喜的!!!










