#!/usr/bin/env python3 #-*- coding: utf-8 -*- import rclpy from rclpy.node import Node from sensor_msgs.msg import Image,CompressedImage from std_msgs.msg import String from cv_bridge import CvBridge import cv2 from pyzbar.pyzbar import decode class QRCodeScannerNode(Node): def __init__(self): super().__init__('qrcode_scanner_node') self.declare_parameter('subscribe_topic', '/image_compressed') self.declare_parameter('publish_text_topic', '/qrcode_text') self.declare_parameter('publish_image_topic', '/qrcode_image') self.declare_parameter('display', True) self.subscribe_topic = self.get_parameter('subscribe_topic').value self.publish_text_topic = self.get_parameter('publish_text_topic').value self.publish_image_topic = self.get_parameter('publish_image_topic').value self.display = self.get_parameter('display').value self.bridge = cv_bridge.CvBridge() self.image_publisher = self.create_publisher(Image, self.publish_image_topic, 10) self.text_publisher = self.create_publisher(String, self.publish_text_topic, 10) self.subscription = self.create_subscription( CompressedImage,"/image_compressed", self.subscribe_topic, self.scan_callback, 10) self.subscription self.scanned_data = set() self.get_logger().info('Subscribed to {self.subscribe_topic},started') def scan_callback(self, msg): self.get_logger().info(f'Received image on {self.subscribe_topic}, format: {msg.format}, size: {len(msg.data)} bytes') try: frame = self.bridge.compressed_imgmsg_to_cv2(msg) self.get_logger().info(f'Image converted, shape: {frame.shape if frame is not None else "None"}') except Exception as e: self.get_logger().error(f"error in transform: {e}") return decoded_objects = decode(frame) self.get_logger().info(f'Detected {len(decoded_objects)} QR codes') new_data_detected = set() text_msg = String() detected_texts = [] for obj in decoded_objects: data = obj.data.decode('utf-8') if data not in self.scanned_data: self.get_logger().info(f"detect a new qr_code: {data}") new_data_detected.add(data) detected_texts.append(data) pts = obj.polygon if len(pts) == 4: pts = [(point.x, point.y) for point in pts] for j in range(4): cv2.line(frame, pts[j], pts[(j+1)%4], (0, 255, 0), 3) cv2.putText(frame, data, (obj.rect.left, obj.rect.top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) self.scanned_data.update(new_data_detected) if detected_texts: text_msg.data = ', '.join(detected_texts) self.text_publisher.publish(text_msg) self.get_logger().info(f'Published QR code text: {text_msg.data}') if self.image_publisher.get_subscription_count() > 0: img_msg = self.bridge.cv2_to_imgmsg(frame, 'bgr8') img_msg.header.stamp = self.get_clock().now().to_msg() self.image_publisher.publish(img_msg) self.get_logger().info('Published processed image') if self.display: cv2.imshow('QR detect', frame) cv2.waitKey(1) def destroy_node(self): cv2.destroyAllWindows() super().destroy_node() def main(args=None): rclpy.init(args=args) node = QRCodeScannerNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()