本帖最后由 J.Xiao 于 2024-4-30 14:10 编辑
一、什么是语音控制
语音识别技术,也被称为自动语音识别(Automatic Speech Recognition,ASR),其目标是将人类的语音中的词汇内容转换为计算机可读的输入。语音识别技术属于人工智能方向的一个重要分支,涉及许多学科,如信号处理、计算机科学、语言学、声学、生理学、心理学等,是人机自然交互技术中的关键环节。它允许计算机理解人类语音并将其转换为文本。比如我们常见的 Alexa、天猫精灵,小爱同学等,都使用了语音识别技术。通过将语识别技术与控制相结合,就可以实现语音控制机器人了。
二、语音识别原理
目前,随着wav2vec2、Conformer 和 Hubert 等最先进模型的最新发展极大地推动了语音识别领域的发展。这些模型采用无需人工标记数据即可从原始音频中学习的技术,从而使它们能够有效地使用未标记语音的大型数据集。它们还被扩展为使用多达 1,000,000 小时的训练数据,远远超过学术监督数据集中使用的传统 1,000 小时,但是以监督方式跨多个数据集和领域预训练的模型已被发现表现出更好的鲁棒性和对持有数据集的泛化,所以执行语音识别等任务仍然需要微调,这限制了它们的全部潜力 。为了解决这个问题OpenAI 开发了 Whisper,一种利用弱监督方法的模型。它是一种自动语音识别 (ASR) 系统,并对从网络收集的680000 小时的多语言和多任务监督数据进行训练。通过使用如此庞大而多样化的数据集提高对口音、背景噪音和技术语言的鲁棒性。此外,它可以转录多种语言,以及从这些语言翻译成英语。该模型实现方法如下图所示:
Whisper架构是一种简单的端到端方法,作为编码器-解码器实现。输入音频被分成 30 秒的块,转换为 log-Mel 频谱图,然后传递到编码器中。训练解码器来预测相应的文本标题,并与指示单个模型执行语言识别、短语级时间戳、多语言语音听录和英语语音翻译等任务的特殊标记混合在一起。
其中,转换器序列到序列模型针对各种语音处理任务进行训练,包括多语言语音识别、语音翻译、口语识别和语音活动检测。这些任务共同表示为解码器要预测的一系列令牌,允许单个模型取代传统语音处理管道的许多阶段。多任务训练格式使用一组特殊标记,用作任务说明符或分类目标。
三、安装方法
Whisper官方使用Python 3.9.9和PyTorch 1.10.1来训练和测试模型,但代码库可与Python 3.8-3.10和最近的PyTorch(上节课已安装)版本兼容。
1、安装依赖库
首先需要安装编解码包,即我们常用的ffmpeg。
sudo apt update && sudo apt install ffmpeg
然后,安装相关的Python包:
sudo apt-get install python3-pyaudio sounddevice
2、安装Whisper
此时,就可以使用以下命令下载并安装(或更新到)最新版本的 Whisper:
pip install -U openai-whisper
或者,也可以从此存储库中提取并安装最新的提交及其 Python 依赖项:
pip install git+https://github.com/openai/whisper.git
如果要将软件包更新到此存储库的最新版本,还可以运行:
pip install --upgrade --no-deps --force-reinstall git+https://github.com/openai/whisper.git
3、模型选择
该模型五种型号尺寸,其中四种为纯英文版本,提供速度和准确性的权衡。以下是可用型号的名称及其大致的内存要求和相对速度。一般模型越大识别效果越好,但同时需要的性能也越高。
Size | Parameters | English-only model | Multilingual model | Required VRAM | Relative speed |
tiny | 39 M | tiny.en | tiny | ~1 GB | ~32x | base | 74 M | base.en | base | ~1 GB | ~16x | small | 244 M | small.en | small | ~2 GB | ~6x | medium | 769 M | medium.en | medium | ~5 GB | ~2x | large | 1550 M | N/A | large | ~10 GB | 1x |
4、功能测试
可以通过如下命令将使用base模型转录音频文件中的语音:
whisper audio.flac audio.mp3 audio.wav --model base
默认设置(选择small模型)适用于转录英语。要转录包含非英语语音的音频文件,您可以使用以下选项指定语言:
whisper chinese.wav --language Chinese
另外,添加--task translate 还会将语音翻译成英文:
whisper chinese.wav --language Chinese --task translate
更多使用方法还可以运行以下命令以查看所有可用选项:
whisper --help
四、编写语音识别节点
我们可以尝试编写一个ROS2的语音识别节点,代码放在spark_ros2/src/spark_app/spark_voice/spark_voice/local_asr.py中:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import whisper
import numpy as np
import sounddevice as sd
from scipy.io.wavfile import write
Model = 'base' # Whisper model size (tiny, base, small, medium, large)
Translate = False # Translate non-English to English?
SampleRate = 44100 # Stream device recording frequency
BlockSize = 30 # Block size in milliseconds
Threshold = 0.1 # Minimum volume threshold to activate listening
Vocals = [50, 1000] # Frequency range to detect sounds that could be speech
EndBlocks = 40 # Number of blocks to wait before sending to Whisper
class SpeechRecognition(Node):
def __init__(self):
super().__init__('SpeechRecognition')
self.declare_parameter('language', 'zh')
self.lang=self.get_parameter('language').get_parameter_value().string_value
self.running = True
self.padding = 0
self.prevblock = self.buffer = np.zeros((0,1))
self.fileready = False
self.get_logger().info("Loading Whisper Model..")
self.model = whisper.load_model(Model)
self.publisher_ = self.create_publisher(String, '/voice/stt', 10)
def callback(self, indata, frames, time, status):
#if status: print(status) # for debugging, prints stream errors.
if not any(indata):
return
# A few alternative methods exist for detecting speech.. #indata.max() > Threshold
#zero_crossing_rate = np.sum(np.abs(np.diff(np.sign(indata)))) / (2 * indata.shape[0]) # threshold 20
freq = np.argmax(np.abs(np.fft.rfft(indata[:, 0]))) * SampleRate / frames
if np.sqrt(np.mean(indata**2)) > Threshold and Vocals[0] <= freq <= Vocals[1]:
print('.', end='', flush=True)
if self.padding < 1: self.buffer = self.prevblock.copy()
self.buffer = np.concatenate((self.buffer, indata))
self.padding = EndBlocks
else:
self.padding -= 1
if self.padding > 1:
self.buffer = np.concatenate((self.buffer, indata))
elif self.padding < 1 < self.buffer.shape[0] > SampleRate: # if enough silence has passed, write to file.
self.fileready = True
write('dictate.wav', SampleRate, self.buffer) # I'd rather send data to Whisper directly..
self.buffer = np.zeros((0,1))
elif self.padding < 1 < self.buffer.shape[0] < SampleRate: # if recording not long enough, reset buffer.
self.buffer = np.zeros((0,1))
else:
self.prevblock = indata.copy() #np.concatenate((self.prevblock[-int(SampleRate/10):], indata)) # SLOW
def process(self):
if self.fileready:
self.get_logger().info("Transcribing {}..".format(self.lang))
result = self.model.transcribe('dictate.wav',fp16=False,language=self.lang,task='translate' if Translate else 'transcribe')
self.get_logger().info(result['text'])
msg = String()
msg.data = result['text']
self.publisher_.publish(msg)
self.fileready = False
def listen(self):
self.get_logger().info("Listening..")
with sd.InputStream(channels=1, callback=self.callback, blocksize=int(SampleRate * BlockSize / 1000), samplerate=SampleRate):
while self.running : self.process()
def main(args=None):
rclpy.init(args=args)
sr = SpeechRecognition()
sr.listen()
rclpy.spin(sr)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
sr.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
该节点的实现思路是,通过sounddevice的回调,不断获取麦克风的实时音频数据。并对数据进行检测是否存在人声,然后送入whisper进行识别,最后把识别结果发到/voice/stt话题。需要注意的是,当使用中文识别时,识别结果有时会是繁体字,所以在编写控制节点时,也需要把这个因素考虑进去。
五、编写语音控制节点
语音控制节点节点接收语音识别结果,并把识别的结果转为控制指令,即可实现对机器人的控制,其文件路径在spark_ros2/src/spark_app/spark_voice/spark_voice/voice_nav.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from math import copysign
from rclpy.parameter import Parameter
class VoiceNav(Node):
def __init__(self):
super().__init__('voice_nav')
#rospy.on_shutdown(self.cleanup)
# Set a number of parameters affecting the robot's speed
self.max_speed = self.get_parameter_or('~max_speed', Parameter('aaa', Parameter.Type.DOUBLE, 0.4)).value
self.max_angular_speed = self.get_parameter_or('~max_angular_speed', Parameter('bbb', Parameter.Type.DOUBLE, 1.5)).value
self.speed = self.get_parameter_or("~start_speed", Parameter('ccc', Parameter.Type.DOUBLE, 0.1)).value
self.angular_speed = self.get_parameter_or("~start_angular_speed", Parameter('ddd', Parameter.Type.DOUBLE, 0.5)).value
self.linear_increment = self.get_parameter_or("~linear_increment", Parameter('eee', Parameter.Type.DOUBLE, 0.05)).value
self.angular_increment = self.get_parameter_or("~angular_increment", Parameter('fff', Parameter.Type.DOUBLE, 0.4)).value
# We don't have to run the script very fast
self.rate = self.get_parameter_or("~rate", Parameter('ggg', Parameter.Type.INTEGER, 5)).value
self.r = self.create_rate(self.rate)
self.STEPS = self.get_parameter_or("~steps", Parameter('hhh', Parameter.Type.INTEGER, 20)).value
self.stp_counts=0
# A flag to determine whether or not voice control is paused
self.paused = False
# Initialize the Twist message we will publish.
self.cmd_vel = Twist()
# Publish the Twist message to the cmd_vel topic
self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
# Subscribe to the /voice/stt topic to receive voice commands.
self.sub1 = self.create_subscription(String, '/voice/stt', self.speech_callback, 10)
self.sub1
# A mapping from keywords or phrases to commands
self.keywords_to_command = {'stop': ['stop','Stop','停止','停'],
'forward': ['forward','Forward','前进','前進'],
'backward': ['backward','Backward','后退','後退'],
'turn left': ['left','Left','左转','左轉'],
'turn right': ['right','Right','右转','右轉']}
self.get_logger().info("Ready to receive voice commands")
self.create_timer(timer_period_sec=0.2, callback=self.timer_callback)
def timer_callback(self):
# We have to keep publishing the cmd_vel message if we want the robot to keep moving.
if rclpy.ok():
self.stp_counts+=1
if(self.stp_counts>=self.STEPS):
self.cmd_vel = Twist()
self.stp_counts=0
self.cmd_vel_pub.publish(self.cmd_vel)
def get_command(self, data):
# Attempt to match the recognized word or phrase to the
# keywords_to_command dictionary and return the appropriate
# command
print(data)
for (command, keywords) in self.keywords_to_command.items():
for word in keywords:
if data.find(word) > -1:
return command
def speech_callback(self, msg):
# Get the motion command from the recognized phrase
command = self.get_command(msg.data)
self.stp_counts=0
# Log the command to the screen
self.get_logger().info("Command: " + str(command))
# If the user has asked to pause/continue voice control,
# set the flag accordingly
if command == 'pause':
self.paused = True
elif command == 'continue':
self.paused = False
# If voice control is paused, simply return without
# performing any action
if self.paused:
return
# The list of if-then statements should be fairly
# self-explanatory
if command == 'forward':
self.cmd_vel.linear.x = self.speed
self.cmd_vel.angular.z = 0.0
elif command == 'rotate left':
self.cmd_vel.linear.x = 0.0
self.cmd_vel.angular.z = self.angular_speed
elif command == 'rotate right':
self.cmd_vel.linear.x = 0.0
self.cmd_vel.angular.z = -self.angular_speed
elif command == 'turn left':
if self.cmd_vel.linear.x != 0:
self.cmd_vel.angular.z += self.angular_increment
else:
self.cmd_vel.angular.z = self.angular_speed
elif command == 'turn right':
if self.cmd_vel.linear.x != 0:
self.cmd_vel.angular.z -= self.angular_increment
else:
self.cmd_vel.angular.z = -self.angular_speed
elif command == 'backward':
self.cmd_vel.linear.x = -self.speed
self.cmd_vel.angular.z = 0.0
elif command == 'stop':
# Stop the robot! Publish a Twist message consisting of all zeros.
self.cmd_vel = Twist()
elif command == 'faster':
self.speed += self.linear_increment
self.angular_speed += self.angular_increment
if self.cmd_vel.linear.x != 0:
self.cmd_vel.linear.x += copysign(self.linear_increment, self.cmd_vel.linear.x)
if self.cmd_vel.angular.z != 0:
self.cmd_vel.angular.z += copysign(self.angular_increment, self.cmd_vel.angular.z)
elif command == 'slower':
self.speed -= self.linear_increment
self.angular_speed -= self.angular_increment
if self.cmd_vel.linear.x != 0:
self.cmd_vel.linear.x -= copysign(self.linear_increment, self.cmd_vel.linear.x)
if self.cmd_vel.angular.z != 0:
self.cmd_vel.angular.z -= copysign(self.angular_increment, self.cmd_vel.angular.z)
elif command in ['quarter', 'half', 'full']:
if command == 'quarter':
self.speed = copysign(self.max_speed / 4, self.speed)
elif command == 'half':
self.speed = copysign(self.max_speed / 2, self.speed)
elif command == 'full':
self.speed = copysign(self.max_speed, self.speed)
if self.cmd_vel.linear.x != 0:
self.cmd_vel.linear.x = copysign(self.speed, self.cmd_vel.linear.x)
if self.cmd_vel.angular.z != 0:
self.cmd_vel.angular.z = copysign(self.angular_speed, self.cmd_vel.angular.z)
else:
return
self.cmd_vel.linear.x = min(self.max_speed, max(-self.max_speed, self.cmd_vel.linear.x))
self.cmd_vel.angular.z = min(self.max_angular_speed, max(-self.max_angular_speed, self.cmd_vel.angular.z))
def cleanup(self):
# When shutting down be sure to stop the robot!
twist = Twist()
self.cmd_vel_pub.publish(twist)
rclpy.sleep(1)
def main(args=None):
rclpy.init(args=args)
g_node = VoiceNav()
g_node.get_logger().info("=============spin")
rclpy.spin(g_node)
g_node.destroy_node()
rclpy.shutdown()
if __name__=="__main__":
try:
main()
except KeyboardInterrupt:
pass
六、使用语音控制小海龟移动
我们先来模拟一下控制小海龟移动,编写launch文件:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
lang=LaunchConfiguration('language',default='zh')
print(lang)
return LaunchDescription([
DeclareLaunchArgument(
'language',
default_value='zh',
choices=['zh', 'en'],
description='Choice language'),
Node(
package='turtlesim',
executable='turtlesim_node',
name='sim'
),
Node(
package='spark_voice',
executable='local_asr',
name='local_asr',
parameters=[{'language':lang}]
),
Node(
package='spark_voice',
executable='voice_nav',
name='voice_nav',
remappings=[
('/cmd_vel', '/turtle1/cmd_vel'),
]
)
])
此时,运行launch文件,并对机器人说话,即可实现小海龟的控制:
七、使用语音控制小车移动
到这里,实现语音控制小车应该已经非常简音,我们只需要把海龟节点替换为真实机器人的节点即可,为了增加launch文件的灵活性,还可以添加一些参数,比如在启动脚本后面添加language:='en'即可实现英文识别:
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import (DeclareLaunchArgument, GroupAction,
IncludeLaunchDescription)
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration
import launch_ros.actions
from launch_ros.actions import Node
from launch.substitutions import PythonExpression
def generate_launch_description():
# Get the launch directory
spark_voice_dir = get_package_share_directory('spark_voice')
spark_bringup_dir = get_package_share_directory('spark_bringup')
# Create the launch configuration variables
serial_port = LaunchConfiguration('serial_port')
enable_arm_tel = LaunchConfiguration('enable_arm_tel')
arm_type_tel = LaunchConfiguration('arm_type_tel')
start_base = LaunchConfiguration('start_base')
start_camera = LaunchConfiguration('start_camera')
start_lidar = LaunchConfiguration('start_lidar')
camera_type_tel = LaunchConfiguration('camera_type_tel')
lidar_type_tel = LaunchConfiguration('lidar_type_tel')
dp_rgist = LaunchConfiguration('dp_rgist')
start_bringup_rviz = LaunchConfiguration('start_bringup_rviz')
language = LaunchConfiguration('language')
declare_serial_port = DeclareLaunchArgument(
'serial_port',
default_value='/dev/sparkBase',
description='serial port name:/dev/sparkBase or /dev/ttyUSBx')
declare_enable_arm_tel = DeclareLaunchArgument(
'enable_arm_tel',
default_value='false',
choices=['true', 'false'],
description='Whether to run arm')
declare_arm_type_tel = DeclareLaunchArgument(
'arm_type_tel',
default_value='uarm',
choices=['uarm', 'sagittarius_arm'],
description='arm name')
declare_start_base = DeclareLaunchArgument(
'start_base',
default_value='true',
choices=['true', 'false'],
description='Whether to run base')
declare_start_camera = DeclareLaunchArgument(
'start_camera',
default_value='true',
choices=['true', 'false'],
description='Whether to run camera')
declare_start_lidar = DeclareLaunchArgument(
'start_lidar',
default_value='true',
choices=['true', 'false'],
description='Whether to run lidar')
declare_camera_type_tel = DeclareLaunchArgument(
'camera_type_tel',
default_value='d435',
#choices=['d435', 'astra_pro'],
description='camera type')
declare_lidar_type_tel = DeclareLaunchArgument(
'lidar_type_tel',
default_value='ydlidar_g6',
#choices=['ydlidar_g2', 'ydlidar_g6'],
description='lidar type')
declare_dp_rgist = DeclareLaunchArgument(
'dp_rgist',
default_value='true',
choices=['true', 'false'],
description='Whether to run dp_rgist')
declare_start_bringup_rviz = DeclareLaunchArgument(
'start_bringup_rviz',
default_value='true',
choices=['true', 'false'],
description='Whether to start_bringup_rviz')
declare_language = DeclareLaunchArgument(
'language',
default_value='zh',
choices=['zh', 'en'],
description='Choice language')
# Specify the actions
driver_group = GroupAction([
IncludeLaunchDescription(
PythonLaunchDescriptionSource(os.path.join(spark_bringup_dir, 'launch',
'driver_bringup.launch.py')),
launch_arguments={'serial_port': serial_port,
'enable_arm_tel': enable_arm_tel,
'arm_type_tel': arm_type_tel,
'start_base' : start_base,
'start_camera': start_camera,
'start_lidar': start_lidar,
'camera_type_tel' : camera_type_tel,
'lidar_type_tel': lidar_type_tel,
'dp_rgist': dp_rgist,
'start_bringup_rviz' : start_bringup_rviz,}.items()),
])
local_asr_node = Node(
package='spark_voice',
executable='local_asr',
output='screen',
emulate_tty=True,
parameters=[{'language':language}]
)
voice_nav_node = Node(
package='spark_voice',
executable='voice_nav',
output='screen',
emulate_tty=True,
)
# Create the launch description and populate
ld = LaunchDescription()
ld.add_action(declare_serial_port)
ld.add_action(declare_enable_arm_tel)
ld.add_action(declare_arm_type_tel)
ld.add_action(declare_start_base)
ld.add_action(declare_start_lidar)
ld.add_action(declare_start_camera)
ld.add_action(declare_camera_type_tel)
ld.add_action(declare_lidar_type_tel)
ld.add_action(declare_dp_rgist)
ld.add_action(declare_start_bringup_rviz)
ld.add_action(declare_language)
ld.add_action(driver_group)
ld.add_action(local_asr_node)
ld.add_action(voice_nav_node)
return ld
|