@
目录在1.2官网注册后拿到APISecret和APIKey,直接复制文章2.5demo代码,保存为real_time_audio_recognition.py,在命令行执行
python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000
使用中有任何问题,欢迎留言提问。
Python 3
标贝科技 https://ai.data-baker.com/#/index
填写邀请码fwwqgs,每日免费调用量还可以翻倍
点击产品地址进行登录,支持短信、密码、微信三种方式登录。
登录后进入【首页概览】,各位开发者可以进行创建多个应用。包括一句话识别、长语音识别、录音文件识别;在线合成、离线合成、长文本合成。
进入【已创建的应用】,左侧选择您需调用的AI技术服务,右侧展示对应服务页面概览(您可查询用量、管理套餐、购买服务量、自主获取授权、预警管理)。
通过服务 / 授权管理,获取对应参数,进行开发配置(获取访问令牌token)
拿到Key和Secret就可以正式使用啦!
在拿到Key和Secret后,我们还需要调用授权接口获取access_token,这个access_token有效时长是24小时。
# 获取access_token用于鉴权 def get_access_token(client_secret, client_id): grant_type = "client_credentials" url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" \ .format(grant_type, client_secret, client_id) try: response = requests.post(url) response.raise_for_status() except Exception as e: print(response.text) raise Exception else: access_token = json.loads(response.text).get('access_token') return access_token
需要根据接口要求设置参数,并且对音频数据进行分割
# 准备数据 def prepare_data(args, access_token): # 读取音频文件 with open(args.file_path, 'rb') as f: file = f.read() # 填写Header信息 audio_format = args.audio_format sample_rate = args.sample_rate splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding='utf-8') for i in range(0, len(file), 5120)] asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1} json_list = [] for i in range(len(splited_data)): if i != len(splited_data) - 1: asr_params['req_idx'] = i else: asr_params['req_idx'] = -len(splited_data) + 1 asr_params["audio_data"] = splited_data[i] data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params} json_list.append(json.dumps(data)) return json_list
client_secret和client_id:在文章1.2的官网获取,必填
file_save_path:文件保存路径,必填
audio_format:音频类型,默认wav格式
sample_rate:采样率,默认16000Hz
# 获取命令行输入参数 def get_args(): parser = argparse.ArgumentParser(description='ASR') parser.add_argument('-client_secret', type=str, required=True) parser.add_argument('-client_id', type=str, required=True) parser.add_argument('-file_path', type=str, required=True) parser.add_argument('--audio_format', type=str, default='wav') parser.add_argument('--sample_rate', type=str, default='16000') args = parser.parse_args() return args
class Client: def __init__(self, data, uri): self.data = data self.uri = uri #建立连接 def connect(self): ws_app = websocket.WebSocketApp(uri, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws_app.run_forever() # 建立连接后发送消息 def on_open(self, ws): print("sending..") for i in range(len(self.data)): ws.send(self.data[i]) # 接收消息 def on_message(self, ws, message): code = json.loads(message).get("code") if code != 90000: # 打印接口错误 print(message) if json.loads(message).get('end_flag') == 1: print(json.loads(message).get('asr_text')) # 打印错误 def on_error(slef, ws, error): print("error: ", str(error)) # 关闭连接 def on_close(ws): print("client closed.")
import argparse import json import base64 import requests import websocket class Client: def __init__(self, data, uri): self.data = data self.uri = uri #建立连接 def connect(self): ws_app = websocket.WebSocketApp(uri, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) ws_app.run_forever() # 建立连接后发送消息 def on_open(self, ws): print("sending..") for i in range(len(self.data)): ws.send(self.data[i]) # 接收消息 def on_message(self, ws, message): code = json.loads(message).get("code") if code != 90000: # 打印接口错误 print(message) if json.loads(message).get('end_flag') == 1: print(json.loads(message).get('asr_text')) # 打印错误 def on_error(slef, ws, error): print("error: ", str(error)) # 关闭连接 def on_close(ws): print("client closed.") # 准备数据 def prepare_data(args, access_token): # 读取音频文件 with open(args.file_path, 'rb') as f: file = f.read() # 填写Header信息 audio_format = args.audio_format sample_rate = args.sample_rate splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding='utf-8') for i in range(0, len(file), 5120)] asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1} json_list = [] for i in range(len(splited_data)): if i != len(splited_data) - 1: asr_params['req_idx'] = i else: asr_params['req_idx'] = -len(splited_data) + 1 asr_params["audio_data"] = splited_data[i] data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params} json_list.append(json.dumps(data)) return json_list # 获取命令行输入参数 def get_args(): parser = argparse.ArgumentParser(description='ASR') parser.add_argument('-client_secret', type=str, required=True) parser.add_argument('-client_id', type=str, required=True) parser.add_argument('-file_path', type=str, required=True) parser.add_argument('--audio_format', type=str, default='wav') parser.add_argument('--sample_rate', type=str, default='16000') args = parser.parse_args() return args # 获取access_token用于鉴权 def get_access_token(client_secret, client_id): grant_type = "client_credentials" url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" \ .format(grant_type, client_secret, client_id) try: response = requests.post(url) response.raise_for_status() except Exception as e: print(response.text) raise Exception else: access_token = json.loads(response.text).get('access_token') return access_token if __name__ == '__main__': try: args = get_args() # 获取access_token client_secret = args.client_secret client_id = args.client_id access_token = get_access_token(client_secret, client_id) # 准备数据 data = prepare_data(args, access_token) uri = "wss://openapi.data-baker.com/asr/realtime" # 建立Websocket连接 client = Client(data, uri) client.connect() except Exception as e: print(e)
复制所有代码,确定音频为wav格式,采样率为16K,在命令行执行
python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000
填写邀请码fwwqgs,每日免费调用量还可以翻倍