参考:
chatglm2 api使用:
https://blog.csdn.net/weixin_42357472/article/details/130342799?spm=1001.2014.3001.5501
sherpa语音转文本识别:
https://blog.csdn.net/weixin_42357472/article/details/131269539?spm=1001.2014.3001.5502
tts播报;
https://blog.csdn.net/weixin_42357472/article/details/132256328?spm=1001.2014.3001.5501
框架流程
自定义个唤醒词(这里:小乐小乐)》通过sherpa语音识别转文字(如果识别到唤醒词处理)=》文字给到LLM大模型处理=》处理结果给到tts声音播报
如果要实时tts播报 LLM大模型返回的实时结果,可以参考(主要就是要api接受服务器实时sse协议内容):https://blog.csdn.net/weixin_42357472/article/details/132336046
代码
#!/usr/bin/env python3
# Real-time speech recognition from a microphone with sherpa-ncnn Python API
#
# Please refer to
# https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
# to download pre-trained models
import sys
try:
import sounddevice as sd
except ImportError as e:
print("Please install sounddevice first. You can use")
print()
print(" pip install sounddevice")
print()
print("to install it")
sys.exit(-1)
import sherpa_ncnn
import pyttsx3
def create_recognizer():
# Please replace the model files if needed.
# See https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
# for download links.
# base_file = "sherpa-ncnn-conv-emformer-transducer-2022-12-06"
# base_file = "sherpa-ncnn-lstm-transducer-small-2023-02-13"
base_file = "sherpa-ncnn-streaming-zipformer-bilingual-zh-en-2023-02-13"
# base_file = "sherpa-ncnn-streaming-zipformer-small-bilingual-zh-en-2023-02-16"
# base_file = "sherpa-ncnn-streaming-zipformer-20M-2023-02-17"
recognizer = sherpa_ncnn.Recognizer(
tokens="./{}/tokens.txt".format(base_file),
encoder_param="./{}/encoder_jit_trace-pnnx.ncnn.param".format(base_file),
encoder_bin="./{}/encoder_jit_trace-pnnx.ncnn.bin".format(base_file),
decoder_param="./{}/decoder_jit_trace-pnnx.ncnn.param".format(base_file),
decoder_bin="./{}/decoder_jit_trace-pnnx.ncnn.bin".format(base_file),
joiner_param="./{}/joiner_jit_trace-pnnx.ncnn.param".format(base_file),
joiner_bin="./{}/joiner_jit_trace-pnnx.ncnn.bin".format(base_file),
num_threads=4,
)
return recognizer
import requests
import json
def chatglm(payload):
""" post请求chatglm2 api服务 """
url="http://192*****4:8000"
json_payload = json.dumps(payload)
# Set the headers to indicate that the request contains JSON data
headers = {'Content-Type': 'application/json'}
# Send the POST request with the JSON payload
response = requests.post(url, data=json_payload, headers=headers).json()
return response
def main():
###llm初始化模型角色定义
history =[["你名字叫*****;每次回答请都简要回答不超过30个字","好的,小乐很乐意为你服务"]]
print("Started! Please speak")
recognizer = create_recognizer()
sample_rate = recognizer.sample_rate
# samples_per_read = int(0.1 * sample_rate) # 0.1 second = 100 ms
samples_per_read = int(3 * sample_rate) # 0.1 second = 100 ms
print(samples_per_read,sample_rate)
last_result = ""
j=0
with sd.InputStream(channels=1, dtype="float32", samplerate=sample_rate) as s:
while True:
samples, _ = s.read(samples_per_read) # a blocking read
print("##"*18)
samples = samples.reshape(-1)
print(samples.shape,samples)
recognizer.accept_waveform(sample_rate, samples)
result = recognizer.text
# print("result:",result,"last_result:",last_result)
###实时语音识别,唤醒词处理等逻辑
if last_result != result:
last_result_len=len(last_result)
words = result[last_result_len:]
print("words:",words)
if "小乐小乐小乐" in words:
pyttsx3.speak("在的呢")
index = words.index("小乐小乐小乐")
new_word = words[index+6:]
print("new_word:",new_word)
if new_word:
pyttsx3.speak(new_word)
last_result = result
j=0
elif "小乐小乐" in words:
pyttsx3.speak("在的呢")
index = words.index("小乐小乐")
new_word = words[index+4:]
print("new_word:",new_word)
if new_word:
pyttsx3.speak(new_word)
last_result = result
j=0
else:
if j==0:
print("speak:",words)
pyttsx3.speak(words)
last_result = result
j=1
##llm 处理与结果播报;或者见文章最下面实时api接口播报代码
results = chatglm({"prompt": words, "history": history})
print(results)
pyttsx3.speak(results["response"])
history = results["history"]
if __name__ == "__main__":
devices = sd.query_devices()
sd.default.device[0] = 0
print(len(devices),devices,sd.default.device,)
input_device_info = sd.query_devices(kind='input')
channels = input_device_info['max_input_channels']
# 打印通道数
print(f"输入设备的通道数: {channels}")
# default_input_device_idx = sd.default.device[0]
# print(f'Use default device: {devices[default_input_device_idx]["name"]}')
print(sd.default.channels)
try:
main()
except KeyboardInterrupt:
print("\nCaught Ctrl + C. Exiting")
实时流api播报,参考:https://blog.csdn.net/weixin_42357472/article/details/132336046
import httpx
import asyncio
async def chatglm_chat(word):
url = "http://192*****4:8000"
data = {
"input": word,
"max_length": 2048,
"top_p": 0.7,
"temperature": 0.95,
"history": [["你名字叫******,让世界更安全;每次回答请都简要回答不超过30个字","好的,小***乐意为你服务"]],
"html_entities": True,
}
text_len = 0
async with httpx.AsyncClient() as client:
async with client.stream("POST", url, json=data) as response:
async for line in response.aiter_lines():
print(line)
line = line[6:]
if text_len == 0:
if "," in line or ":" in line or "。" in line or "、" in line or "!" in line or "," in line:
pyttsx3.speak(line)
text_len += len(line)
else:
new_line = line[text_len:]
if "," in new_line or ":" in new_line or "。" in new_line or "、" in new_line or "!" in new_line or "," in new_line:
pyttsx3.speak(new_line)
text_len += len(new_line)
# 调用异步函数
asyncio.run(chatglm_chat(words))
完整代码:
#!/usr/bin/env python3
# Real-time speech recognition from a microphone with sherpa-ncnn Python API
#
# Please refer to
# https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
# to download pre-trained models
import sys
try:
import sounddevice as sd
except ImportError as e:
print("Please install sounddevice first. You can use")
print()
print(" pip install sounddevice")
print()
print("to install it")
sys.exit(-1)
import sherpa_ncnn
import pyttsx3
def create_recognizer():
# Please replace the model files if needed.
# See https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
# for download links.
# base_file = "sherpa-ncnn-conv-emformer-transducer-2022-12-06"
# base_file = "sherpa-ncnn-lstm-transducer-small-2023-02-13"
base_file = "sherpa-ncnn-streaming-zipformer-bilingual-zh-en-2023-02-13"
# base_file = "sherpa-ncnn-streaming-zipformer-small-bilingual-zh-en-2023-02-16"
# base_file = "sherpa-ncnn-streaming-zipformer-20M-2023-02-17"
recognizer = sherpa_ncnn.Recognizer(
tokens="./{}/tokens.txt".format(base_file),
encoder_param="./{}/encoder_jit_trace-pnnx.ncnn.param".format(base_file),
encoder_bin="./{}/encoder_jit_trace-pnnx.ncnn.bin".format(base_file),
decoder_param="./{}/decoder_jit_trace-pnnx.ncnn.param".format(base_file),
decoder_bin="./{}/decoder_jit_trace-pnnx.ncnn.bin".format(base_file),
joiner_param="./{}/joiner_jit_trace-pnnx.ncnn.param".format(base_file),
joiner_bin="./{}/joiner_jit_trace-pnnx.ncnn.bin".format(base_file),
num_threads=4,
)
return recognizer
import requests
import json
def chatglm(payload):
url="http://192.168.19.14:8000"
json_payload = json.dumps(payload)
# Set the headers to indicate that the request contains JSON data
headers = {'Content-Type': 'application/json'}
# Send the POST request with the JSON payload
response = requests.post(url, data=json_payload, headers=headers).json()
return response
import httpx
import asyncio
async def chatglm_chat(word):
url = "http://192.168.19.14:8000"
data = {
"input": word,
"max_length": 2048,
"top_p": 0.7,
"temperature": 0.95,
"history": [["你名字****过30个字","好的,小杰很乐意为你服务"]],
"html_entities": True,
}
text_len = 0
async with httpx.AsyncClient() as client:
async with client.stream("POST", url, json=data) as response:
async for line in response.aiter_lines():
print(line)
line = line[6:]
if text_len == 0:
if "," in line or ":" in line or "。" in line or "、" in line or "!" in line or "," in line:
pyttsx3.speak(line)
text_len += len(line)
else:
new_line = line[text_len:]
if "," in new_line or ":" in new_line or "。" in new_line or "、" in new_line or "!" in new_line or "," in new_line:
pyttsx3.speak(new_line)
text_len += len(new_line)
def main():
history =[["你名字叫*******过30个字","好的,小杰很乐意为你服务"]]
print("Started! Please speak")
recognizer = create_recognizer()
sample_rate = recognizer.sample_rate
# samples_per_read = int(0.1 * sample_rate) # 0.1 second = 100 ms
samples_per_read = int(5 * sample_rate) # 0.1 second = 100 ms
print(samples_per_read,sample_rate)
last_result = ""
j=0
with sd.InputStream(channels=1, dtype="float32", samplerate=sample_rate) as s:
while True:
samples, _ = s.read(samples_per_read) # a blocking read
print("##"*18)
samples = samples.reshape(-1)
print(samples.shape,samples)
recognizer.accept_waveform(sample_rate, samples)
result = recognizer.text
# print("result:",result,"last_result:",last_result)
if last_result != result:
last_result_len=len(last_result)
words = result[last_result_len:]
print("words:",words)
if "小杰小杰小杰" in words:
pyttsx3.speak("在的呢")
index = words.index("小杰小杰小杰")
new_word = words[index+6:]
print("new_word:",new_word)
if new_word:
pyttsx3.speak(new_word)
last_result = result
j=0
elif "小杰小杰" in words:
pyttsx3.speak("在的呢")
index = words.index("小杰小杰")
new_word = words[index+4:]
print("new_word:",new_word)
if new_word:
pyttsx3.speak(new_word)
last_result = result
j=0
else:
if j==0:
print("speak:",words)
# pyttsx3.speak(words)
last_result = result
j=1
# 调用异步函数
asyncio.run(chatglm_chat(words))
if __name__ == "__main__":
devices = sd.query_devices()
sd.default.device[0] = 0
print(len(devices),devices,sd.default.device,)
input_device_info = sd.query_devices(kind='input')
channels = input_device_info['max_input_channels']
# 打印通道数
print(f"输入设备的通道数: {channels}")
# default_input_device_idx = sd.default.device[0]
# print(f'Use default device: {devices[default_input_device_idx]["name"]}')
print(sd.default.channels)
try:
main()
except KeyboardInterrupt:
print("\nCaught Ctrl + C. Exiting")