python实现语音识别(讯飞开放平台)

讯飞平台使用

1.注册讯飞平台账号讯飞官网网址
2.打开讯飞控制台。
3.点击“创建新应用”。
在这里插入图片描述
4.输入“应用名称”,“应用分类”,“应用功能描述”(这些都是自定义的)。
5.创建成功后,记住“APPID”,“APISecret”,“APIKey”这三个关键。
在这里插入图片描述

python实现讯飞接口的语音识别

第一步:导入需要的依赖库

import websocket
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import pyaudio

如果有没有的依赖库,通过pip在Anaconda的配置虚拟环境进行依赖库的下载。
例如:

pip install pyaudio

第二步:声明全局变量

在Python中,全局变量是在程序的任何地方都可以访问的变量。
作用和意义:
	1.共享数据:全局变量能够在整个程序中共享数据,不受作用域的限制。
	2.存储常量或配置:全局变量可以用来存储常量或配置项,这些值在整个程序中都需要使用,并且不会变化。
	3.减少参数传递:全局变量可以减少函数或方法之间的参数传递。
存在的问题:
	1.命名空间污染:全局变量可能导致命名空间的污染,特别是在大型程序或模块化开发中。如果多个模块或函数定义了相同名称的全局变量,会导致冲突和意外的行为。
	2.难以追踪和维护:使用全局变量会使代码更难理解和维护。
因此,在使用全局变量时应谨慎,仅在必要的情况下使用。

声明的是音频

STATUS_FIRST_FRAME = 0  # 第一帧的标识
STATUS_CONTINUE_FRAME = 1  # 中间帧标识
STATUS_LAST_FRAME = 2  # 最后一帧的标识

第三步:初始化讯飞接口对象

class Ws_Param(object):
    # 初始化接口对象
    def __init__(self,APPID,APIKey,APISecret):
        # 初始化讯飞接口的ID,Key,Secret
        self.APPID=APPID
        self.APIKey=APIKey
        self.APISecret=APISecret
        # 公共参数(common)
        self.CommonArgs={
    
    "app_id":self.APPID}
        # 业务参数(business)
        self.BusinessArgs={
    
    "domain":"iat","language":"zh_cn",
                           "accent":"mandarin","vinfo":1,"vad_eos":1000}

    def create_url(self):
        # 生成url
        url='wss://ws-api.xfyun.cn/v2/iat'
        now=datetime.now()
        date=format_date_time(mktime(now.timetuple()))
        # 生成RFC1123格式的时间戳
        signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
        # 拼接字符串
        signature_sha = hmac.new(self.APISecret.encode('utf-8'),
                                 signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
        # 进行hmac_sha256进行加密
        authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", " \
                               "signature=\"%s\"" % (self.APIKey, "hmac-sha256",
                                                     "host date request-line", signature_sha)
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
        v={
    
    
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
        }
        # 将请求的鉴权参数组合为字典
        url=url+'?'+urlencode(v)
        # 拼接鉴权参数,生成url
        return url

第四步:收到websocket建立连接后的处理函数

def on_open(ws):
    # 收到websocket连接建立的处理
    def run(*args):
        # 在线音频处理并发送到讯飞
        status=STATUS_FIRST_FRAME
        # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
        CHUNK = 520  # 定义数据流块
        FORMAT = pyaudio.paInt16  # 16bit编码格式
        CHANNELS = 1  # 单声道
        RATE = 16000  # 16000采样频率
        p=pyaudio.PyAudio()  # 录音
        # 实例化pyaudio对象
        stream = p.open(format=FORMAT,  # 音频流wav格式
                        channels=CHANNELS,  # 单声道
                        rate=RATE,  # 采样率16000
                        input=True,
                        frames_per_buffer=CHUNK)
        # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
        print("---------------开始录音-----------------")
        # 开始录音
        global text
        for i in range(0,int(RATE/CHUNK*60)):
            # 录制特定时间的音频
            buf=stream.read(CHUNK)
            # 读出声卡缓冲区的音频数据
            if not buf:
                status=STATUS_LAST_FRAME
            if status==STATUS_FIRST_FRAME:
                # 首帧处理
                d = {
    
    "common": wsParam.CommonArgs,
                     "business": wsParam.BusinessArgs,
                     "data": {
    
    "status": 0, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                d = json.dumps(d)
                # 将拼接的字符串d数据结构转换为json
                ws.send(d)
                status=STATUS_CONTINUE_FRAME
            elif status==STATUS_CONTINUE_FRAME:
                # 中间帧处理
                d = {
    
    "data": {
    
    "status": 1, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                ws.send(json.dumps(d))
            elif status==STATUS_LAST_FRAME:
                # 最后一帧处理
                d = {
    
    "data": {
    
    "status": 2, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                ws.send(json.dumps(d))
                time.sleep(1)
                break
    thread.start_new_thread(run,())

第五步:收到websocket消息的处理函数

def on_message(ws,message):
    # 收到websocket消息的正常处理
    try:
        # print(json.loads(message))
        code = json.loads(message)["code"]
        # 解码返回的message的json数据中的code
        sid = json.loads(message)["sid"]
        if code != 0:
            errMsg = json.loads(message)["message"]
            # 解码message中错误信息
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
        else:
            data = json.loads(message)["data"]["result"]["ws"]
            # 解码message中ws数据
            result = ""
            for i in data:
                for w in i["cw"]:
                    result += w["w"]

            if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
                pass
            else:
                # t.insert(END, result)  # 把上边的标点插入到result的最后
                print("翻译结果: %s。" % (result))
                global recording_results
                recording_results=result
    except Exception as e:
        # 异常处理,参数异常
        print("receive msg,but parse exception:", e)

def on_error(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
    run()
    # 重新启动监听

def on_close(ws):
    # 收到websocket关闭的处理
    pass

第六步:整合运行各函数

在这里面有三个关键的地方:“APPID”,“APIKey”,“APISecret”。
将上面讯飞控制台应用项目中的“APPID”,“APIKey”,“APISecret”粘贴到代码中。

def run():
    global wsParam
    wsParam=Ws_Param(APPID='5d27db6',
                    APIKey='d61163a9bdb5d0508f98dee66e0383',
                    APISecret='ZTQ5NTAwZTk05MDdhNWViZjcyYjVh')
    # 初始化讯飞接口编码
    
    websocket.enableTrace(False)
    # True表示默认在控制台打印连接和信息发送接收情况
    
    wsUrl=wsParam.create_url()
    # 生成讯飞的url进行连接
    
    ws=websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close)
    ws.on_open=on_open  # 进行websocket连接
    
    ws.run_forever(sslopt={
    
    "cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
    # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去

开始运行。

if __name__ == '__main__':
	run()

完整代码

''' 在线语音识别 '''
import websocket
import hashlib
import base64
import hmac
import json
from urllib.parse import urlencode
import time
import ssl
from wsgiref.handlers import format_date_time
from datetime import datetime
from time import mktime
import _thread as thread
import pyaudio


recording_results=""   # 识别结果
STATUS_FIRST_FRAME = 0  # 第一帧的标识
STATUS_CONTINUE_FRAME = 1  # 中间帧标识
STATUS_LAST_FRAME = 2  # 最后一帧的标识

class Ws_Param(object):
    # 初始化接口对象
    def __init__(self,APPID,APIKey,APISecret):
        # 初始化讯飞接口的ID,Key,Secret
        self.APPID=APPID
        self.APIKey=APIKey
        self.APISecret=APISecret
        # 公共参数(common)
        self.CommonArgs={
    
    "app_id":self.APPID}
        # 业务参数(business)
        self.BusinessArgs={
    
    "domain":"iat","language":"zh_cn",
                           "accent":"mandarin","vinfo":1,"vad_eos":1000}

    def create_url(self):
        # 生成url
        url='wss://ws-api.xfyun.cn/v2/iat'
        now=datetime.now()
        date=format_date_time(mktime(now.timetuple()))
        # 生成RFC1123格式的时间戳
        signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
        # 拼接字符串
        signature_sha = hmac.new(self.APISecret.encode('utf-8'),
                                 signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
        # 进行hmac_sha256进行加密
        authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", " \
                               "signature=\"%s\"" % (self.APIKey, "hmac-sha256",
                                                     "host date request-line", signature_sha)
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
        v={
    
    
            "authorization": authorization,
            "date": date,
            "host": "ws-api.xfyun.cn"
        }
        # 将请求的鉴权参数组合为字典
        url=url+'?'+urlencode(v)
        # 拼接鉴权参数,生成url
        return url


def on_open(ws):
    # 收到websocket连接建立的处理
    def run(*args):
        # 在线音频处理并发送到讯飞
        status=STATUS_FIRST_FRAME
        # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
        CHUNK = 520  # 定义数据流块
        FORMAT = pyaudio.paInt16  # 16bit编码格式
        CHANNELS = 1  # 单声道
        RATE = 16000  # 16000采样频率
        p=pyaudio.PyAudio()  # 录音
        # 实例化pyaudio对象
        stream = p.open(format=FORMAT,  # 音频流wav格式
                        channels=CHANNELS,  # 单声道
                        rate=RATE,  # 采样率16000
                        input=True,
                        frames_per_buffer=CHUNK)
        # 创建音频流,使用这个对象去打开声卡,设置采样深度、通道数、采样率、输入和采样点缓存数量
        print("---------------开始录音-----------------")
        # 开始录音
        global text
        for i in range(0,int(RATE/CHUNK*60)):
            # 录制特定时间的音频
            buf=stream.read(CHUNK)
            # 读出声卡缓冲区的音频数据
            if not buf:
                status=STATUS_LAST_FRAME
            if status==STATUS_FIRST_FRAME:
                # 首帧处理
                d = {
    
    "common": wsParam.CommonArgs,
                     "business": wsParam.BusinessArgs,
                     "data": {
    
    "status": 0, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                d = json.dumps(d)
                # 将拼接的字符串d数据结构转换为json
                ws.send(d)
                status=STATUS_CONTINUE_FRAME
            elif status==STATUS_CONTINUE_FRAME:
                # 中间帧处理
                d = {
    
    "data": {
    
    "status": 1, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                ws.send(json.dumps(d))
            elif status==STATUS_LAST_FRAME:
                # 最后一帧处理
                d = {
    
    "data": {
    
    "status": 2, "format": "audio/L16;rate=16000",
                              "audio": str(base64.b64encode(buf), 'utf-8'),
                              "encoding": "raw"}}
                ws.send(json.dumps(d))
                time.sleep(1)
                break
    thread.start_new_thread(run,())


def on_message(ws,message):
    # 收到websocket消息的正常处理
    try:
        # print(json.loads(message))
        code = json.loads(message)["code"]
        # 解码返回的message的json数据中的code
        sid = json.loads(message)["sid"]
        if code != 0:
            errMsg = json.loads(message)["message"]
            # 解码message中错误信息
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
        else:
            data = json.loads(message)["data"]["result"]["ws"]
            # 解码message中ws数据
            result = ""
            for i in data:
                for w in i["cw"]:
                    result += w["w"]

            if result == '。' or result == '.。' or result == ' .。' or result == ' 。':
                pass
            else:
                # t.insert(END, result)  # 把上边的标点插入到result的最后
                print("翻译结果: %s。" % (result))
                global recording_results
                recording_results=result
    except Exception as e:
        # 异常处理,参数异常
        print("receive msg,but parse exception:", e)

def on_error(ws,error):
    # 收到websocket后错误的处理
    print("### error ### : ",error)
    run()
    # 重新启动监听

def on_close(ws):
    # 收到websocket关闭的处理
    pass

def run():
    global wsParam
    wsParam=Ws_Param(APPID='5d27dbc6',
                    APIKey='d61163a9bdb5d0a0508f98dee66e0383',
                    APISecret='ZTQ5NTAwZTk0YzQ5MDdhNWViZjcyYjVh')
    # 初始化讯飞接口编码
    websocket.enableTrace(False)
    # True表示默认在控制台打印连接和信息发送接收情况
    wsUrl=wsParam.create_url()
    # 生成讯飞的url进行连接
    ws=websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close)
    ws.on_open=on_open  # 进行websocket连接
    ws.run_forever(sslopt={
    
    "cert_reqs": ssl.CERT_NONE}, ping_timeout=2)
    # 无限循环,只要这个websocket连接未断开,这个循环就会一直进行下去

if __name__ == '__main__':
	run()

猜你喜欢

转载自blog.csdn.net/weixin_48434899/article/details/131543126