Files
2025-04-03 06:21:55 +00:00

113 lines
3.1 KiB
Python

import whisper
import os
import json
import redis
from dotenv import load_dotenv
from kafka import KafkaConsumer
import asyncio
# 在导入其他库之前设置
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# 设置要使用的GPU ID
GPU_ID = 1 # 修改这个值来选择要使用的GPU
# 设置CUDA_VISIBLE_DEVICES环境变量
os.environ["CUDA_VISIBLE_DEVICES"] = str(GPU_ID)
# 加载环境变量
load_dotenv()
print("正在加载Whisper模型...")
model = whisper.load_model("small")
print("Whisper模型加载完成。")
# Kafka配置
KAFKA_BROKER = os.getenv('KAFKA_BROKER')
KAFKA_TOPIC = os.getenv('KAFKA_ASR_TOPIC')
# Redis配置
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = int(os.getenv('REDIS_PORT'))
REDIS_ASR_DB = int(os.getenv('REDIS_ASR_DB'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD')
REDIS_TASK_DB = int(os.getenv('REDIS_TASK_DB'))
# 创建Redis客户端
redis_asr_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_ASR_DB,
password=REDIS_PASSWORD
)
redis_task_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_TASK_DB,
password=REDIS_PASSWORD
)
async def process_audio(file_path: str, cache_key: str):
try:
# 更新任务状态
redis_task_client.set(f"task_status:{cache_key}", "processing")
result = model.transcribe(file_path)
transcription = result['text']
print(f"处理了文件: {file_path}")
print(f"转录结果: {transcription}")
redis_asr_client.setex(cache_key, 3600, transcription)
result_data = {
'transcription': transcription
}
redis_asr_client.publish('asr_results', json.dumps(result_data))
# 更新任务状态
redis_task_client.set(f"task_status:{cache_key}", "completed")
os.remove(file_path)
except Exception as e:
print(f"处理音频文件时发生错误: {str(e)}")
# 更新任务状态
redis_task_client.set(f"task_status:{cache_key}", "error")
async def kafka_consumer():
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
group_id='asr_group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
print(f"ASR消费者已启动")
for message in consumer:
try:
task = message.value
file_path = task.get('file_path')
task_id = task.get('task_id')
status = task.get('status')
if not file_path or not task_id or status != 'queued':
print(f"收到无效任务: {task}")
continue
cache_key = f"asr:{task_id}"
print(f"开始处理任务: {cache_key}")
await process_audio(file_path, cache_key)
print(f"完成处理任务: {cache_key}")
except Exception as e:
print(f"处理消息时发生错误: {str(e)}")
if __name__ == "__main__":
print("启动Kafka消费者处理ASR请求...")
asyncio.run(kafka_consumer())