TogetheROS | 用ChatRobot实现机器人控制

ChatGPT是一款强大的基于深度学习的自然语言处理模型,能够生成高质量的自然语言文本。TogetheROS是地平线面向机器人厂商和生态开发者推出的机器人操作系统,旨在释放在机器人等智能化场景的潜能,助力生态开发者和商业客户能够高效、便捷的进行机器人开发。将这两种技术融合在一起,可以实现根据使用文本控制机器人的功能。本教程将介绍如何使用ChatGPT+TogetheROS,即ChatRobot将文字描述转化为小车控制指令,以及如何根据描述生成特定功能的代码,使机器人按照描述执行相应的任务。

无图无真相,让我们先来看看实际效果!

效果展示:

使用文字描述控制小车运动:

使用ChatGPT生成代码实现要求的功能(小车原地旋转6秒):

使用ChatGPT生成代码实现要求的功能(小车正方形运动):

看完效果,咱们一起来了解一下背后的实现。工欲善其事必先利其器,开发之前,需要先安装好相关的依赖。

(一)安装相关功能包

本教程提供两种与chatGPT交互的方式:

使用OpenAI提供的API接口

#安装openai
pip install openai

使用revChatGPT功能包

实际上是与网页端的chatGPT交互:

#安装依赖
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
sudo apt-get install cargo

#安装revChatGPT
pip3 install --upgrade revChatGPT

安装完依赖,咱们再来看一下代码开发,首先创建工作空间,这块强烈建议学习一下胡老师的ROS2入门21讲

(二)代码开发

编写chatGPT_input.py

该程序实现chatGPT内容输入的节点:

#chatGPT_input.py

import rclpy
import openai
from rclpy.node import Node
from std_msgs.msg import String

class publisherNode(Node):
 def __init__(self):
 super().__init__('publisher_node')
        self.pub = self.create_publisher(String, 'user_input', 10)
 while rclpy.ok():
            msg = String()
            user_input = input("请输入要发布的消息: ")
            msg.data = user_input
            self.pub.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    publisher_node = publisherNode()
    rclpy.spin(publisher_node)
    publisher_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

编写chatGPT_base.py

该程序实现chatGPT交互以及文本中代码的提取、编译、运行的功能

#chatGPT_base.py

import re
import os
import rclpy
import openai
from rclpy.node import Node
from std_msgs.msg import String
from revChatGPT.V1 import Chatbot

chatbot = Chatbot(config={
 #access_token获取https://chat.openai.com/api/auth/session
 "access_token": "YOUR_ACCESS_TOKEN"
})
openai.api_key = "YOUR_API_KEY"

class OpenAINode(Node):
 def __init__(self):
 super().__init__('openai_node')
        self.declare_parameter('generate_code', False)
        self.declare_parameter('revChatGPT', False)
        self.generate_code = self.get_parameter('generate_code').value
        self.revChatGPT = self.get_parameter('revChatGPT').value
        self.publisher_ = self.create_publisher(String, 'chatgpt_node', 10)
        self.subscription = self.create_subscription(
            String,
 'user_input',
            self.listener_callback,
 10)
        self.history = ""
        self.get_logger().info("init success") 

 def listener_callback(self, msg):
 # 处理接收到的输入信息
        input_text = msg.data
        pub_msg = String()
        self.get_logger().info("start get respones") 
 if self.revChatGPT == False:
 #使用openAI提供的API
            self.history += input_text 
            self.history += " "
 # 调用OpenAI的gpt-3.5-turbo模型生成文本
            response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                    {"role": "user", "content": self.history+input_text}
                ]
            )
            pub_msg.data = response['choices'][0]['message']['content']
 print(pub_msg.data)
            self.history += pub_msg.data
            self.history += " "
 else:
 #使用revChatGPT功能包
            prev_text = ""
 for data in chatbot.ask(input_text,):
                message = data["message"][len(prev_text):]
 print(message, end="", flush=True)
                prev_text = data["message"]
            pub_msg.data = prev_text
 if self.generate_code == True:
 #提取文本消息中的代码
            start = "```python"
            end = "```"
 if start in prev_text and end in prev_text:
                start_index = prev_text.index(start) + len(start)
                end_index = prev_text.index(end, start_index)
                result = prev_text[start_index:end_index].strip()
 #将代码保存至同路径下的chatGPT_code.py文件
                self.file = open(os.getcwd()+'/src/chatGPT_test/chatGPT_test/chatGPT_code.py', 'w')
                self.file.write(result)
                self.file.close()
 #运行脚本,编译并运行程序
 #build_run.sh中是编译命令以及运行命令
                os.system("bash build_run.sh")
 print()
 #发布生成的文本信息
        self.publisher_.publish(pub_msg)

def main(args=None):
    rclpy.init(args=args)
    chatgpt_node = OpenAINode()
    rclpy.spin(chatgpt_node)
    chatgpt_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

编写chatGPT_control.py

该程序实现chatGPT文本内容解析功能,提取控制相关的信息,并填充至Twist消息中发布

#chatGPT_control.py

import re
import time
import rclpy

from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist

class controlNode(Node):
 def __init__(self):
 super().__init__('control_node')
        self.subscription = self.create_subscription(
            String,
 'chatgpt_node',
            self.listener_callback,
 10)
        self.publisher_ = self.create_publisher(Twist, 'cmd_vel', 10)
        self.history = ""

 def listener_callback(self, msg):
 #处理接收到的控制消息
 print("start parse msg")
        strlx = re.compile(r"linear.x = (.*?)(?![\d.-])")
        strly = re.compile(r"linear.y = (.*?)(?![\d.-])")
        strlz = re.compile(r"linear.z = (.*?)(?![\d.-])")
        strax = re.compile(r"angular.x = (.*?)(?![\d.-])")
        stray = re.compile(r"angular.y = (.*?)(?![\d.-])")
        straz = re.compile(r"angular.z = (.*?)(?![\d.-])")
        input_text = msg.data
        twist = Twist()
 #填充提取到的控制参数
 if re.search(strlx,input_text):
            val_linear_x = re.search(strlx,input_text).group(1)
            twist.linear.x = float(val_linear_x)
 print(val_linear_x , end = " ")
 if re.search(strly,input_text):
            val_linear_y = re.search(strly,input_text).group(1)
            twist.linear.y = float(val_linear_y)
 print(val_linear_y , end = " ")      
 if re.search(strlz,input_text):
            val_linear_z = re.search(strlz,input_text).group(1)
            twist.linear.z = float(val_linear_z)
 print(val_linear_z , end = " ")
 if re.search(strax,input_text):
            val_angular_x = re.search(strax,input_text).group(1)
            twist.angular.x = float(val_angular_x)
 print(val_angular_x , end = " ")  
 if re.search(stray,input_text):
            val_angular_y = re.search(stray,input_text).group(1)
            twist.angular.y = float(val_angular_y)
 print(val_angular_y , end = " ")
 if re.search(straz,input_text):
            val_angular_z = re.search(straz,input_text).group(1)
            twist.angular.z = float(val_angular_z)
 print(val_angular_z)

        self.publisher_.publish(twist)
        time.sleep(1)
        twist0 = Twist()
        self.publisher_.publish(twist0)

def main(args=None):
    rclpy.init(args=args)
    control_node = controlNode()
    rclpy.spin(control_node)
    control_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

代码文件结构

dev_ws
├── build_run.sh(编译和运行的脚本)
├── build
├── install
└── src
    └── chatGPT_test
        ├── package.xml
        ├── resource
        ├── setup.cfg
        ├── setup.py
        ├── test
        └──chatGPT_test
            ├── chatGPT_base.py
            ├── chatGPT_code.py(用于存放chatGPT生成的代码)
            ├── chatGPT_control.py
            └── chatGPT_input.py

整个功能包放在最后的附件中,大家有兴趣可以直接下载编译运行,需要填充chatGPT_base.py中的access_token和api_key,具体的运行步骤如下。

(三)运行流程

使用文字描述控制小车运动

输入文本参考

  • 首先输入:我现在已经将你的输出与ros节点连接,我希望通过你来控制小车运动,所以我需要你将我说的话转化成小车的运动指令,比如:“往前走”,我需要你回答:“linear.x = 1,linear.y = 0,linear.z = 0,angular.x = 0,angular.y = 0,angular.z = 0”。
  • 后续输入:前进、后退,左转,右转等,体验功能。

启动流程

#启动小车底盘控制节点(根据实际程序启动)
ros2 launch turn_on_wheeltec_robot turn_on_wheeltec_robot.launch.py

#进入工作空间,此处以dev_ws为例
cd /home/sunrise/dev_ws
#设置环境变量
source /opt/tros/setup.bash source install/setup.bash
#启动文本输入节点
ros2 run chatGPT_test chatGPT_input
#启动ChatGPT交互节点
ros2 run chatGPT_test chatGPT_base
#启动控制消息提取节点
ros2 run chatGPT_test chatGPT_control

流程图

使用ChatGPT生成代码实现要求的功能

输入文本参考

  • 小车原地旋转六秒:

请生成一个ROS 2 Python程序,用于控制小车旋转。具体要求如下:

1、小车需要以角速度0.5旋转6秒钟,旋转结束后将速度改为0.0,让小车停止运动。话题名应该为“/cmd_vel”;

2、使“geometry_msgs/msg/twist.hpp”作为twist消息的头文件,该消息包含线速度和角速度两个成员;

3、所有的twist消息都应该是浮点数类型。请确保生成的代码可以实现以上功能要求。只需要代码,不需要解释说明。

  • 小车以正方形轨迹运动:

请生成一个ROS 2 Python程序,用于控制小车运动。具体要求如下:

1、运动轨迹是一个正方形。话题名应该为“/cmd_vel”;

2、使用“geometry_msgs/msg/twist.hpp”作为twist消息的头文件,该消息包含线速度和角速度两个成员,线速度为0.2,移动时间为3秒。角速度为1.0,旋转时间为2秒;

3、时间控制使用time.sleep;

4、所有的twist消息都应该是浮点数类型;

5、运行结束后,将所有速度改为0,并休眠1秒钟,确保小车停止运动。请确保生成的代码可以实现以上功能要求。只需要代码,不需要解释说明。

启动流程

#启动小车底盘控制节点(根据实际程序启动)
ros2 launch turn_on_wheeltec_robot turn_on_wheeltec_robot.launch.py 

#进入工作空间,此处以dev_ws为例
cd /home/sunrise/dev_ws
#设置环境变量
source /opt/tros/setup.bash source install/setup.bash
#启动文本输入节点
ros2 run chatGPT_test chatGPT_input
#启动ChatGPT交互节点
ros2 run chatGPT_test chatGPT_base --ros-args -p generate_code:=True -p revChatGPT:=True

流程图

本次分享就到这啦!大家有兴趣可以尝试玩一下这两个功能,也可以基于这两个小功能开发更多好玩的东西!同时欢迎大家提出自己的想法,后续有更好玩的应用也会跟大家分享!

Tips:

  • ChatGPT注册流程以及基础功能搭建参考:当旭日X3派遇上ChatGpt
  • 所提供的两块功能是比较简单基础的功能,并且因为chatGPT的输出内容不稳定,有一定的失败几率,大家可以多多尝试更好的描述!
  • 使用revChatGPT,access_token可能一段时间会进行更新,需要同步更新。
  • 目前revChatGPT目前正在持续迭代,功能并不稳定,有相关问题可直接访问revChatGPT代码仓库查询。
  • 若访问速度较慢,可在程序中增加代理:
import os
os.environ["http_proxy"] = "代理服务器网址:端口"
os.environ["https_proxy"] = "代理服务器网址:端口"

dev_ws_20230314143706.rar

???