Files
zydi-web/test_history/qwen copy.py
T
2025-01-12 03:01:51 +00:00

554 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import torch
from datetime import datetime
from PIL import Image
import io
import re
from decord import VideoReader
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import redis
import time
import gc
# 配置
QWEN_MODEL_PATH = "/obscura/models/qwen/Qwen2-VL-7B-Instruct"
# 初始化 Qwen 模型 (使用 cuda:0)
print("正在初始化 Qwen 模型 (cuda:0)...")
model = Qwen2VLForConditionalGeneration.from_pretrained(
QWEN_MODEL_PATH,
torch_dtype="auto",
device_map="cuda:0"
)
min_pixels = 128*28*28
max_pixels = 256*28*28
processor = AutoProcessor.from_pretrained(
QWEN_MODEL_PATH,
min_pixels=min_pixels,
max_pixels=max_pixels
)
# 在文件开头添加加载配置的代码
def load_config():
"""加载配置文件"""
try:
with open('info.json', 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
print(f"加载配置文件失败: {e}")
return {"actions": [], "environments": []}
# 加载配置
CONFIG = load_config()
class MediaAnalysisSystem:
def __init__(self):
self.MAX_NUM_FRAMES = 10
self.device = "cuda:0"
self.qwen_model = model
self.qwen_processor = processor
# 使用加载的配置
self.environments = CONFIG["environments"]
self.actions = CONFIG["actions"]
self.emotions = [
"钦佩", "赞赏", "欣赏","关心", "高兴", "", "乐观", "感激", "释然", "骄傲", "愉悦",
"愤怒", "烦恼", "焦虑", "尴尬", "失望", "厌恶", "恐惧", "悲伤", "懊悔", "羞耻","发呆",
"困惑", "好奇", "欲望", "惊讶", "实事求是", "中性", "赞叹","平静","放松","专注","思考"
]
self.objects = [
"办公桌椅","文件柜","打印机","饮水机","装饰植物","书架","储物柜","水瓶", "办公用品", "文件", "电脑","风扇","鼠标","键盘","纸巾","","","袋子","盒子","水杯","杯子","马克杯","玻璃杯","文件夹","书包","书架","手机"
]
self.furniture = [
"椅子", "桌子", "咖啡桌", "文件柜", "", "沙发","柜子","架子","摄像头","靠垫","办公椅","电视","白板","显示器","置物架","文件架"
]
self.features = [
"戴眼镜","不戴眼镜","长发","短发","长头发","短头发","戴帽子","不戴帽子","戴口罩","不戴口罩","男性","女性","","","","","","","成年人"
]
def encode_video(self, video_data):
def uniform_sample(l, n):
gap = len(l) / n
return [l[int(i * gap + gap / 2)] for i in range(n)]
video_file = io.BytesIO(video_data)
vr = VideoReader(video_file)
sample_fps = round(vr.get_avg_fps() / 1)
frame_idx = list(range(0, len(vr), sample_fps))
if len(frame_idx) > self.MAX_NUM_FRAMES:
frame_idx = uniform_sample(frame_idx, self.MAX_NUM_FRAMES)
frames = vr.get_batch(frame_idx).asnumpy()
frames = [Image.fromarray(v.astype('uint8')) for v in frames]
print('num frames:', len(frames))
return frames
def process_with_qwen(self, media_data, object_name, media_type='image'):
"""使用 Qwen 模型处理媒体"""
if media_type == 'video':
frames = self.encode_video(media_data)
media_content = {"type": "video", "video": frames, "fps": 1.0}
else:
image = Image.open(io.BytesIO(media_data))
media_content = {"type": "image", "image": image}
messages = [
{
"role": "user",
"content": [
media_content,
{"type": "text", "text": self._get_analysis_prompt(media_type)}
],
}
]
text = self.qwen_processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = self.qwen_processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(self.device)
generated_ids = self.qwen_model.generate(**inputs, max_new_tokens=2048)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
answer = self.qwen_processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return {
"model": "qwen",
"original_answer": answer,
"extracted_info": self.extract_info(answer)
}
def _get_analysis_prompt(self, media_type):
"""获取分析提示词"""
return f"""你是一位视频描述专家,你擅长对视频进行详细的描述,请对这段监控视频进行详细分析,包括以下方面,并按照下面格式回答:
1. 环境场景
- 整体场景描述(室内/室外、光线条件等)
- 主要物品和家具列表
- 环境特征(如光线、整洁度等)
2. 人员统计
- 总人数:[数字]人
- 性别分布:[男性数量]/[女性数量]
(若无法确定准确人数,请注明"无法确定人数"
3. 人员特征分析
- 个人特征:性别、年龄段、着装、体态等
- 携带物品:详细描述随身物品及用途
- 表情/情绪状态
4. 行为分析
- 个人行为:移动方向、姿态、动作等
- 互动情况:人员之间的交互描述(若多人)
- 活动区域:人员活动的主要位置
5. 群体行为(若多人)
- 聚集形态
- 移动趋势
- 群体互动特点
6. 异常情况
- 可疑行为描述
- 异常活动标记
- 需要注意的安全隐患
请用清晰、有条理的格式描述,并突出重要发现。"""
def extract_info(self, answer):
"""提取中文信息"""
info = {
"environment": None,
"num_people": None,
"actions": [],
"objects": [],
"furniture": [],
"emotions": [],
"features": []
}
# 将回答按章节分割
sections = {}
current_section = None
for line in answer.split('\n'):
if line.startswith('###'):
current_section = line.strip('# ').lower()
sections[current_section] = []
elif current_section and line.strip():
sections[current_section].append(line.strip())
# 从"行为分析"部分提取动作
if '行为分析' in sections:
behavior_text = ' '.join(sections['行为分析'])
# 使用加载的动作列表
for action in self.actions:
if action in behavior_text:
if action not in info["actions"]: # 避免重复
info["actions"].append(action)
# 从"环境场景"部分提取物品和家具
if '环境场景' in sections:
scene_text = ' '.join(sections['环境场景'])
for obj in self.objects: # 假设已将objects移到类属性
if obj in scene_text:
if obj not in info["objects"]:
info["objects"].append(obj)
for item in self.furniture: # 假设已将furniture移到类属性
if item in scene_text:
if item not in info["furniture"]:
info["furniture"].append(item)
# 从"人员特征分析"部分提取特征和情绪
if '人员特征分析' in sections:
feature_text = ' '.join(sections['人员特征分析'])
for feature in self.features: # 假设已将features移到类属性
if feature in feature_text:
if feature not in info["features"]:
info["features"].append(feature)
for emotion in self.emotions: # 假设已将emotions移到类属性
if emotion in feature_text:
if emotion not in info["emotions"]:
info["emotions"].append(emotion)
# 中文数字模式
people_patterns = [
r'(\d+)\s*(人|个人|位|名|员工|用户|小朋友|成年人|女性|男性)',
r'(一|二|三|四|五|六|七|八|九|十)\s*(人|个人|位|名|员工|用户|小朋友|成年人|女性|男性)',
r'(一个|几个)\s*(人|个人|员工|用户|小朋友|成年人|女性|男性)',
r'\s*(名|位)\s*(人|员工|用户|小朋友|成年人|女性|男性)?',
r'(男|女)(性|生|士)',
r'(成年|未成年|青少年|老年)\s*(人|群体)',
r'(员工|职工|工人|学生|顾客|观众|游客|乘客)',
r'(群众|民众|大众|公众)',
r'(男女|老少|老幼|大人|小孩)'
]
for pattern in people_patterns:
match = re.search(pattern, answer)
if match:
if match.group(1).isdigit():
info["num_people"] = int(match.group(1))
elif match.group(1) in ['一个', '']:
info["num_people"] = 1
else:
num_word_to_digit = {
'': 2, '': 3, '': 4, '': 5,
'': 6, '': 7, '': 8, '': 9, '': 10
}
info["num_people"] = num_word_to_digit.get(match.group(1), 0)
break
return info
def process_video_folder(system, folder_path):
"""处理文件夹中的所有视频文件并保存到Redis"""
valid_extensions = {'.mp4', '.avi', '.mov', '.mkv'}
if not os.path.exists(folder_path):
raise MediaAnalysisError(f"错误:文件夹 '{folder_path}' 不存在")
video_files = [
f for f in os.listdir(folder_path)
if os.path.splitext(f)[1].lower() in valid_extensions
]
if not video_files:
raise MediaAnalysisError(f"错误:在文件夹 '{folder_path}' 中未找到支持的视频文件")
print(f"\n找到 {len(video_files)} 个视频文件,开始处理...\n")
# 创建VideoMonitor实例用于Redis操作
monitor = VideoMonitor(folder_path, system)
for i, video_file in enumerate(video_files, 1):
video_path = os.path.join(folder_path, video_file)
print(f"正在处理 ({i}/{len(video_files)}): {video_file}")
try:
# 使用VideoMonitor的process_new_video方法处理并保存到Redis
monitor.process_new_video(video_path)
print(f"✓ 成功处理并保存到Redis: {video_file}")
# 清理内存
if torch.cuda.is_available():
torch.cuda.empty_cache()
import gc
gc.collect()
except Exception as e:
print(f"✗ 处理失败 {video_file}: {str(e)}")
print(f"\n所有视频处理完成")
class MediaAnalysisError(Exception):
"""自定义媒体分析异常类"""
pass
# 在 MediaAnalysisSystem 类后添加新的监听类
class VideoMonitor:
def __init__(self, recordings_path, system):
self.recordings_path = recordings_path
self.system = system
self.redis_clients = {
'A01': redis.Redis(
host="222.186.10.253",
port=6379,
password="Obscura@2024",
db=210
),
'B02': redis.Redis(
host="222.186.10.253",
port=6379,
password="Obscura@2024",
db=211
)
}
# 新增:初始化时加载已处理的视频记录
self.processed_videos = self._load_processed_videos()
def _load_processed_videos(self):
"""从Redis加载所有已处理的视频文件名"""
processed_videos = set()
try:
for camera_id, redis_client in self.redis_clients.items():
# 获取所有小时级别的键
for key in redis_client.keys('*'):
key_str = key.decode('utf-8')
# 只获取键中存储的文件名列表,而不是完整的处理结果
data = redis_client.get(key)
if data:
hour_results = json.loads(data)
# 只添加文件名到集合中
processed_videos.update(hour_results.keys())
print(f"已从Redis加载 {len(processed_videos)} 个已处理文件记录")
return processed_videos
except Exception as e:
print(f"加载Redis处理记录时出错: {str(e)}")
return set()
def _get_redis_key(self, video_path):
try:
# 从路径获取摄像头ID (目录名)
dir_name = os.path.basename(os.path.dirname(video_path))
file_name = os.path.basename(video_path) # 例如:A01_20250105_134104.avi
# 从视频文件名中提取日期和时间
match = re.search(r'(\w+)_(\d{8})_(\d{2})\d{4}\.avi', file_name)
if match:
camera_id = match.group(1) # A01
date = match.group(2) # 20250105
hour = match.group(3) # 13 (从134104中提取)
# 生成正确的key: A01_20250105_1300
redis_key = f"{camera_id}_{date}_{hour}00"
return redis_key
print(f"文件名格式不匹配: {file_name}")
return None
except Exception as e:
print(f"生成Redis key失败: {str(e)}")
return None
def _is_processed(self, video_path):
"""检查视频是否已处理"""
file_name = os.path.basename(video_path)
return file_name in self.processed_videos
def process_new_video(self, video_path):
try:
# 处理前清理
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
file_name = os.path.basename(video_path)
# 检查是否已处理
if self._is_processed(video_path):
print(f"视频已处理过,跳过: {file_name}")
return
# 获取camera_id和时间戳
dir_name = os.path.basename(os.path.dirname(video_path))
file_name = os.path.basename(video_path)
# 使用_get_redis_key获取正确的key
redis_key = self._get_redis_key(video_path)
if not redis_key:
print(f"无法生成Redis key,跳过处理: {file_name}")
return
# 添加视频文件检查
if not os.path.exists(video_path):
print(f"警告:视频文件不存在,跳过处理: {video_path}")
return False
if os.path.getsize(video_path) == 0:
print(f"警告:视频文件大小为0,跳过处理: {video_path}")
return False
# 处理视频
try:
with open(video_path, "rb") as f:
video_data = f.read()
try:
qwen_result = self.system.process_with_qwen(video_data, file_name, media_type='video')
except Exception as e:
print(f"处理视频内容失败,可能是损坏的视频文件: {file_name}")
print(f"错误详情: {str(e)}")
return False
# 从文件名提取时间戳
timestamp_match = re.search(r'(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})', file_name)
if timestamp_match:
year, month, day, hour, minute, second = timestamp_match.groups()
# 构建正确的时间戳格式 (YYYY-MM-DD HH:MM:SS)
timestamp = f"{year}-{month}-{day} {hour}:{minute}:{second}"
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result = {
"video_analysis": {
"qwen-7B": {
"original_answer": qwen_result["original_answer"],
"extracted_info": qwen_result["extracted_info"]
}
},
"timestamp": timestamp # 使用从文件名提取的时间戳
}
# 保存到对应的Redis数据库
if dir_name in self.redis_clients:
redis_client = self.redis_clients[dir_name]
# 获取现有的小时数据(如果存在)
existing_data = redis_client.get(redis_key)
if existing_data:
hour_results = json.loads(existing_data)
hour_results[file_name] = result
else:
hour_results = {file_name: result}
# 保存更新后的数据
json_str = json.dumps(hour_results, ensure_ascii=False)
redis_client.set(redis_key, json_str)
print(f"成功保存到Redis,使用的key: {redis_key}") # 调试信息
# 处理完成后,更新内存中的记录
self.processed_videos.add(file_name)
except Exception as e:
print(f"读取视频文件失败: {str(e)}")
return False
except Exception as e:
print(f"处理视频时发生错误 {video_path}: {str(e)}")
return False
finally:
# 确保内存清理总是执行
if torch.cuda.is_available():
try:
torch.cuda.empty_cache()
gc.collect()
except Exception as e:
print(f"清理GPU内存时发生错误: {str(e)}")
return True
def process_existing_videos(self):
"""处理目录中现有的视频文件"""
videos_found = False
videos_processed = False # 新增标志,用于跟踪是否实际处理了视频
for camera_dir in os.listdir(self.recordings_path):
camera_path = os.path.join(self.recordings_path, camera_dir)
if not os.path.isdir(camera_path):
continue
# 获取所有.avi文件并按时间排序
video_files = []
for video_file in os.listdir(camera_path):
if video_file.endswith('.avi'):
video_path = os.path.join(camera_path, video_file)
video_files.append((video_path, os.path.getmtime(video_path)))
if video_files:
videos_found = True
# 按修改时间排序
video_files.sort(key=lambda x: x[1])
for video_path, _ in video_files:
if not self._is_processed(video_path):
print(f"处理现有视频: {video_path}")
self.process_new_video(video_path)
videos_processed = True # 标记已处理视频
# 只有当找到视频并且实际处理了视频时才返回True
return videos_found and videos_processed
def monitor_directories(self):
"""监控目录变化"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"开始监控目录: {self.recordings_path} [{current_time}]")
while True:
try:
# 首先处理现有视频
for camera_dir in os.listdir(self.recordings_path):
camera_path = os.path.join(self.recordings_path, camera_dir)
if not os.path.isdir(camera_path):
continue
for video_file in os.listdir(camera_path):
if not video_file.endswith('.avi'):
continue
video_path = os.path.join(camera_path, video_file)
if not self._is_processed(video_path):
print(f"处理视频: {video_path}")
if not self.process_new_video(video_path):
print(f"视频处理失败,继续处理下一个: {video_path}")
continue
# 添加状态提示
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] 等待新视频中...")
# 休眠一段时间再检查
time.sleep(120)
except Exception as e:
print(f"监控过程出错: {str(e)}")
time.sleep(30) # 出错后等待30秒再继续
def main():
try:
system = MediaAnalysisSystem()
recordings_path = "/home/zydi/VLM/recordings" # 设置recordings目录路径
# 创建并启动监控器
monitor = VideoMonitor(recordings_path, system)
monitor.monitor_directories()
except Exception as e:
print(f"\n未预期的错误: {str(e)}")
if __name__ == "__main__":
main()