ChatGPT是一款强大的基于深度学习的自然语言处理模型,能够生成高质量的自然语言文本。TogetheROS是地平线面向机器人厂商和生态开发者推出的机器人操作系统,旨在释放在机器人等智能化场景的潜能,助力生态开发者和商业客户能够高效、便捷的进行机器人开发。将这两种技术融合在一起,可以实现根据使用文本控制机器人的功能。本教程将介绍如何使用ChatGPT+TogetheROS,即ChatRobot将文字描述转化为小车控制指令,以及如何根据描述生成特定功能的代码,使机器人按照描述执行相应的任务。
无图无真相,让我们先来看看实际效果!
效果展示:
使用文字描述控制小车运动:
使用ChatGPT生成代码实现要求的功能(小车原地旋转6秒):
使用ChatGPT生成代码实现要求的功能(小车正方形运动):
看完效果,咱们一起来了解一下背后的实现。工欲善其事必先利其器,开发之前,需要先安装好相关的依赖。
(一)安装相关功能包
本教程提供两种与chatGPT交互的方式:
使用OpenAI提供的API接口
#安装openai
pip install openai
使用revChatGPT功能包
实际上是与网页端的chatGPT交互:
#安装依赖
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
sudo apt-get install cargo
#安装revChatGPT
pip3 install --upgrade revChatGPT
安装完依赖,咱们再来看一下代码开发,首先创建工作空间,这块强烈建议学习一下胡老师的ROS2入门21讲!
(二)代码开发
编写chatGPT_input.py
该程序实现chatGPT内容输入的节点:
#chatGPT_input.py
import rclpy
import openai
from rclpy.node import Node
from std_msgs.msg import String
class publisherNode(Node):
def __init__(self):
super().__init__('publisher_node')
self.pub = self.create_publisher(String, 'user_input', 10)
while rclpy.ok():
msg = String()
user_input = input("请输入要发布的消息: ")
msg.data = user_input
self.pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
publisher_node = publisherNode()
rclpy.spin(publisher_node)
publisher_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
编写chatGPT_base.py
该程序实现chatGPT交互以及文本中代码的提取、编译、运行的功能

#chatGPT_base.py
import re
import os
import rclpy
import openai
from rclpy.node import Node
from std_msgs.msg import String
from revChatGPT.V1 import Chatbot
chatbot = Chatbot(config={
#access_token获取https://chat.openai.com/api/auth/session
"access_token": "YOUR_ACCESS_TOKEN"
})
openai.api_key = "YOUR_API_KEY"
class OpenAINode(Node):
def __init__(self):
super().__init__('openai_node')
self.declare_parameter('generate_code', False)
self.declare_parameter('revChatGPT', False)
self.generate_code = self.get_parameter('generate_code').value
self.revChatGPT = self.get_parameter('revChatGPT').value
self.publisher_ = self.create_publisher(String, 'chatgpt_node', 10)
self.subscription = self.create_subscription(
String,
'user_input',
self.listener_callback,
10)
self.history = ""
self.get_logger().info("init success")
def listener_callback(self, msg):
# 处理接收到的输入信息
input_text = msg.data
pub_msg = String()
self.get_logger().info("start get respones")
if self.revChatGPT == False:
#使用openAI提供的API
self.history += input_text
self.history += " "
# 调用OpenAI的gpt-3.5-turbo模型生成文本
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": self.history+input_text}
]
)
pub_msg.data = response['choices'][0]['message']['content']
print(pub_msg.data)
self.history += pub_msg.data
self.history += " "
else:
#使用revChatGPT功能包
prev_text = ""
for data in chatbot.ask(input_text,):
message = data["message"][len(prev_text):]
print(message, end="", flush=True)
prev_text = data["message"]
pub_msg.data = prev_text
if self.generate_code == True:
#提取文本消息中的代码
start = "```python"
end = "```"
if start in prev_text and end in prev_text:
start_index = prev_text.index(start) + len(start)
end_index = prev_text.index(end, start_index)
result = prev_text[start_index:end_index].strip()
#将代码保存至同路径下的chatGPT_code.py文件
self.file = open(os.getcwd()+'/src/chatGPT_test/chatGPT_test/chatGPT_code.py', 'w')
self.file.write(result)
self.file.close()
#运行脚本,编译并运行程序
#build_run.sh中是编译命令以及运行命令
os.system("bash build_run.sh")
print()
#发布生成的文本信息
self.publisher_.publish(pub_msg)
def main(args=None):
rclpy.init(args=args)
chatgpt_node = OpenAINode()
rclpy.spin(chatgpt_node)
chatgpt_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
编写chatGPT_control.py
该程序实现chatGPT文本内容解析功能,提取控制相关的信息,并填充至Twist消息中发布
#chatGPT_control.py
import re
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
class controlNode(Node):
def __init__(self):
super().__init__('control_node')
self.subscription = self.create_subscription(
String,
'chatgpt_node',
self.listener_callback,
10)
self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
self.history = ""
def listener_callback(self, msg):
#处理接收到的控制消息
print("start parse msg")
strlx = re.compile(r"linear.x = (.*?)(?![\d.-])")
strly = re.compile(r"linear.y = (.*?)(?![\d.-])")
strlz = re.compile(r"linear.z = (.*?)(?![\d.-])")
strax = re.compile(r"angular.x = (.*?)(?![\d.-])")
stray = re.compile(r"angular.y = (.*?)(?![\d.-])")
straz = re.compile(r"angular.z = (.*?)(?![\d.-])")
input_text = msg.data
twist = Twist()
#填充提取到的控制参数
if re.search(strlx,input_text):
val_linear_x = re.search(strlx,input_text).group(1)
twist.linear.x = float(val_linear_x)
print(val_linear_x , end = " ")
if re.search(strly,input_text):
val_linear_y = re.search(strly,input_text).group(1)
twist.linear.y = float(val_linear_y)
print(val_linear_y , end = " ")
if re.search(strlz,input_text):
val_linear_z = re.search(strlz,input_text).group(1)
twist.linear.z = float(val_linear_z)
print(val_linear_z , end = " ")
if re.search(strax,input_text):
val_angular_x = re.search(strax,input_text).group(1)
twist.angular.x = float(val_angular_x)
print(val_angular_x , end = " ")
if re.search(stray,input_text):
val_angular_y = re.search(stray,input_text).group(1)
twist.angular.y = float(val_angular_y)
print(val_angular_y , end = " ")
if re.search(straz,input_text):
val_angular_z = re.search(straz,input_text).group(1)
twist.angular.z = float(val_angular_z)
print(val_angular_z)
self.publisher_.publish(twist)
time.sleep(1)
twist0 = Twist()
self.publisher_.publish(twist0)
def main(args=None):
rclpy.init(args=args)
control_node = controlNode()
rclpy.spin(control_node)
control_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
代码文件结构
dev_ws
├── build_run.sh(编译和运行的脚本)
├── build
├── install
└── src
└── chatGPT_test
├── package.xml
├── resource
├── setup.cfg
├── setup.py
├── test
└──chatGPT_test
├── chatGPT_base.py
├── chatGPT_code.py(用于存放chatGPT生成的代码)
├── chatGPT_control.py
└── chatGPT_input.py
整个功能包放在最后的附件中,大家有兴趣可以直接下载编译运行,需要填充chatGPT_base.py中的access_token和api_key,具体的运行步骤如下。
(三)运行流程
使用文字描述控制小车运动
输入文本参考
- 首先输入:我现在已经将你的输出与ros节点连接,我希望通过你来控制小车运动,所以我需要你将我说的话转化成小车的运动指令,比如:“往前走”,我需要你回答:“linear.x = 1,linear.y = 0,linear.z = 0,angular.x = 0,angular.y = 0,angular.z = 0”。
- 后续输入:前进、后退,左转,右转等,体验功能。
启动流程
#启动小车底盘控制节点(根据实际程序启动)
ros2 launch turn_on_wheeltec_robot turn_on_wheeltec_robot.launch.py
#进入工作空间,此处以dev_ws为例
cd /home/sunrise/dev_ws
#设置环境变量
source /opt/tros/setup.bash source install/setup.bash
#启动文本输入节点
ros2 run chatGPT_test chatGPT_input
#启动ChatGPT交互节点
ros2 run chatGPT_test chatGPT_base
#启动控制消息提取节点
ros2 run chatGPT_test chatGPT_control
流程图

使用ChatGPT生成代码实现要求的功能
输入文本参考
- 小车原地旋转六秒:
请生成一个ROS 2 Python程序,用于控制小车旋转。具体要求如下:
1、小车需要以角速度0.5旋转6秒钟,旋转结束后将速度改为0.0,让小车停止运动。话题名应该为“/cmd_vel”;
2、使“geometry_msgs/msg/twist.hpp”作为twist消息的头文件,该消息包含线速度和角速度两个成员;
3、所有的twist消息都应该是浮点数类型。请确保生成的代码可以实现以上功能要求。只需要代码,不需要解释说明。
- 小车以正方形轨迹运动:
请生成一个ROS 2 Python程序,用于控制小车运动。具体要求如下:
1、运动轨迹是一个正方形。话题名应该为“/cmd_vel”;
2、使用“geometry_msgs/msg/twist.hpp”作为twist消息的头文件,该消息包含线速度和角速度两个成员,线速度为0.2,移动时间为3秒。角速度为1.0,旋转时间为2秒;
3、时间控制使用time.sleep;
4、所有的twist消息都应该是浮点数类型;
5、运行结束后,将所有速度改为0,并休眠1秒钟,确保小车停止运动。请确保生成的代码可以实现以上功能要求。只需要代码,不需要解释说明。
启动流程
#启动小车底盘控制节点(根据实际程序启动)
ros2 launch turn_on_wheeltec_robot turn_on_wheeltec_robot.launch.py
#进入工作空间,此处以dev_ws为例
cd /home/sunrise/dev_ws
#设置环境变量
source /opt/tros/setup.bash source install/setup.bash
#启动文本输入节点
ros2 run chatGPT_test chatGPT_input
#启动ChatGPT交互节点
ros2 run chatGPT_test chatGPT_base --ros-args -p generate_code:=True -p revChatGPT:=True
流程图

本次分享就到这啦!大家有兴趣可以尝试玩一下这两个功能,也可以基于这两个小功能开发更多好玩的东西!同时欢迎大家提出自己的想法,后续有更好玩的应用也会跟大家分享!
Tips:
- ChatGPT注册流程以及基础功能搭建参考:当旭日X3派遇上ChatGpt;
- 所提供的两块功能是比较简单基础的功能,并且因为chatGPT的输出内容不稳定,有一定的失败几率,大家可以多多尝试更好的描述!
- 使用revChatGPT,access_token可能一段时间会进行更新,需要同步更新。
- 目前revChatGPT目前正在持续迭代,功能并不稳定,有相关问题可直接访问revChatGPT代码仓库查询。
- 若访问速度较慢,可在程序中增加代理:
import os
os.environ["http_proxy"] = "代理服务器网址:端口"
os.environ["https_proxy"] = "代理服务器网址:端口"