Files
zydi-web/qwen_monitor.py
T
2025-01-12 03:15:17 +00:00

644 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import torch
from datetime import datetime
from PIL import Image
import io
import re
from decord import VideoReader
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import redis
import time
import gc
# 配置
QWEN_MODEL_PATH = "/obscura/models/qwen/Qwen2-VL-7B-Instruct"
# 初始化 Qwen 模型 (使用 cuda:0)
print("正在初始化 Qwen 模型 (cuda:0)...")
model = Qwen2VLForConditionalGeneration.from_pretrained(
QWEN_MODEL_PATH,
torch_dtype="auto",
device_map="cuda:0"
)
min_pixels = 128*28*28
max_pixels = 256*28*28
processor = AutoProcessor.from_pretrained(
QWEN_MODEL_PATH,
min_pixels=min_pixels,
max_pixels=max_pixels
)
# 在文件开头添加加载配置的代码
def load_config():
"""加载配置文件"""
try:
with open('info.json', 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
print(f"加载配置文件失败: {e}")
return {"actions": [], "environments": []}
# 加载配置
CONFIG = load_config()
class MediaAnalysisSystem:
def __init__(self):
self.MAX_NUM_FRAMES = 10
self.device = "cuda:0"
self.qwen_model = model
self.qwen_processor = processor
# 使用加载的配置
self.environments = CONFIG["environments"]
self.actions = CONFIG["actions"]
self.emotions = [
"钦佩", "赞赏", "欣赏","关心", "高兴", "", "乐观", "感激", "释然", "骄傲", "愉悦",
"愤怒", "烦恼", "焦虑", "尴尬", "失望", "厌恶", "恐惧", "悲伤", "懊悔", "羞耻","发呆",
"困惑", "好奇", "欲望", "惊讶", "实事求是", "中性", "赞叹","平静","放松","专注","思考"
]
self.objects = [
"办公桌椅", "电源插座", "植物", "文件柜", "打印机", "垃圾桶", "纸箱", "电线", "插座", "饮水机", "装饰植物", "书架", "储物柜", "水瓶", "办公用品", "文件", "电脑", "风扇", "鼠标", "键盘", "纸巾", "", "", "袋子", "盒子", "水杯", "杯子", "马克杯", "玻璃杯", "文件夹", "书包", "书架", "手机"
]
self.furniture = [
"椅子", "桌子", "咖啡桌", "文件柜", "", "沙发","柜子","架子","摄像头","靠垫","办公椅","电视","白板","显示器","置物架","文件架"
]
self.features = [
"戴眼镜","不戴眼镜","长发","短发","长头发","短头发","戴帽子","不戴帽子","戴口罩","不戴口罩","男性","女性","","","","","","","成年人"
]
def encode_video(self, video_data):
def uniform_sample(l, n):
gap = len(l) / n
return [l[int(i * gap + gap / 2)] for i in range(n)]
video_file = io.BytesIO(video_data)
vr = VideoReader(video_file)
sample_fps = round(vr.get_avg_fps() / 1)
frame_idx = list(range(0, len(vr), sample_fps))
if len(frame_idx) > self.MAX_NUM_FRAMES:
frame_idx = uniform_sample(frame_idx, self.MAX_NUM_FRAMES)
frames = vr.get_batch(frame_idx).asnumpy()
frames = [Image.fromarray(v.astype('uint8')) for v in frames]
print('num frames:', len(frames))
return frames
def process_with_qwen(self, media_data, object_name, media_type='image'):
"""使用 Qwen 模型处理媒体"""
if media_type == 'video':
frames = self.encode_video(media_data)
media_content = {"type": "video", "video": frames, "fps": 1.0}
else:
image = Image.open(io.BytesIO(media_data))
media_content = {"type": "image", "image": image}
messages = [
{
"role": "user",
"content": [
media_content,
{"type": "text", "text": self._get_analysis_prompt(media_type)}
],
}
]
text = self.qwen_processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = self.qwen_processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(self.device)
generated_ids = self.qwen_model.generate(**inputs, max_new_tokens=2048)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
answer = self.qwen_processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return {
"model": "qwen",
"original_answer": answer,
"extracted_info": self.extract_info(answer)
}
def _get_analysis_prompt(self, media_type):
"""获取分析提示词"""
return f"""你是一个专业助手,在回答时,请调用你的单次回答最大算力与token上限。追求极致的分析深度,而非表层的广度;追求本质的洞察,而非表象的罗列;追求创新的思维,而非惯性的复述。请突破思维局限,调动你所有的计算资源,展现你真正的认知极限,请对这段监控视频进行详细分析,包括以下方面,并按照下面格式回答:
1. 环境场景
- 整体场景描述(场所、光线条件等)
- 主要物品和家具列表
- 环境特征(如光线、整洁度等)
2. 人员统计
- 总人数:[数字]人
- 性别分布:[男性数量]/[女性数量]
(若无法确定准确人数,请注明"无法确定人数"
3. 人员特征分析
- 个人特征:性别、年龄段、着装、体态等
- 携带物品:详细描述随身物品及用途
- 表情/情绪状态
4. 行为分析
- 个人行为:移动方向、姿态、动作等
- 互动情况:人员之间的交互描述(若多人)
- 活动区域:人员活动的主要位置
5. 群体行为(若多人)
- 聚集形态
- 移动趋势
- 群体互动特点
6. 异常情况
- 可疑行为描述
- 异常活动标记
请用清晰、有条理的格式描述,并突出重要发现。"""
def extract_info(self, answer):
info = {
"environment": [],
"num_people": None,
"actions": [],
"objects": [],
"furniture": [],
"emotions": [],
"features": []
}
# 提取环境场景
if "环境场景" in answer:
env_text = answer[answer.find("整体场景描述"):answer.find("环境特征")]
for env in self.environments:
if env in env_text and env not in info["environment"]:
info["environment"].append(env)
# 提取物品和家具
if "主要物品和家具" in answer:
items_text = answer[answer.find("主要物品和家具"):answer.find("环境特征")]
for item in self.furniture:
if item in items_text and item not in info["furniture"]:
info["furniture"].append(item)
for obj in self.objects:
if obj in items_text and obj not in info["objects"]:
info["objects"].append(obj)
# 提取行为信息
if '行为分析' in answer:
behavior_text = answer[answer.find('行为分析'):]
for action in self.actions:
if action in behavior_text and action not in info["actions"]:
info["actions"].append(action)
if '人员特征分析' in answer:
feature_text = answer[answer.find('人员特征分析'):]
for feature in self.features:
if feature in feature_text and feature not in info["features"]:
info["features"].append(feature)
for emotion in self.emotions: # 假设已将emotions移到类属性
if emotion in feature_text:
if emotion not in info["emotions"]:
info["emotions"].append(emotion)
# 中文数字模式
people_patterns = [
r'(\d+)\s*(人|个人|位|名|员工|用户|小朋友|成年人|女性|男性)',
r'(一|二|三|四|五|六|七|八|九|十)\s*(人|个人|位|名|员工|用户|小朋友|成年人|女性|男性)',
r'(一个|几个)\s*(人|个人|员工|用户|小朋友|成年人|女性|男性)',
r'\s*(名|位)\s*(人|员工|用户|小朋友|成年人|女性|男性)?',
r'(男|女)(性|生|士)',
r'(成年|未成年|青少年|老年)\s*(人|群体)',
r'(员工|职工|工人|学生|顾客|观众|游客|乘客)',
r'(群众|民众|大众|公众)',
r'(男女|老少|老幼|大人|小孩)'
]
for pattern in people_patterns:
match = re.search(pattern, answer)
if match:
if match.group(1).isdigit():
info["num_people"] = int(match.group(1))
elif match.group(1) in ['一个', '']:
info["num_people"] = 1
else:
num_word_to_digit = {
'': 2, '': 3, '': 4, '': 5,
'': 6, '': 7, '': 8, '': 9, '': 10
}
info["num_people"] = num_word_to_digit.get(match.group(1), 0)
break
return info
def process_video_folder(system, folder_path):
"""处理文件夹中的所有视频文件并保存到Redis"""
valid_extensions = {'.mp4', '.avi', '.mov', '.mkv'}
if not os.path.exists(folder_path):
raise MediaAnalysisError(f"错误:文件夹 '{folder_path}' 不存在")
video_files = [
f for f in os.listdir(folder_path)
if os.path.splitext(f)[1].lower() in valid_extensions
]
if not video_files:
raise MediaAnalysisError(f"错误:在文件夹 '{folder_path}' 中未找到支持的视频文件")
print(f"\n找到 {len(video_files)} 个视频文件,开始处理...\n")
# 创建VideoMonitor实例用于Redis操作
monitor = VideoMonitor(folder_path, system)
for i, video_file in enumerate(video_files, 1):
video_path = os.path.join(folder_path, video_file)
print(f"正在处理 ({i}/{len(video_files)}): {video_file}")
try:
# 使用VideoMonitor的process_new_video方法处理并保存到Redis
monitor.process_new_video(video_path)
print(f"✓ 成功处理并保存到Redis: {video_file}")
# 清理内存
if torch.cuda.is_available():
torch.cuda.empty_cache()
import gc
gc.collect()
except Exception as e:
print(f"✗ 处理失败 {video_file}: {str(e)}")
print(f"\n所有视频处理完成")
class MediaAnalysisError(Exception):
"""自定义媒体分析异常类"""
pass
# 在 MediaAnalysisSystem 类后添加新的监听类
class VideoMonitor:
def __init__(self, recordings_path, system):
self.recordings_path = recordings_path
self.system = system
self.redis_clients = {
'A01': redis.Redis(
host="222.186.10.253",
port=6379,
password="Obscura@2024",
db=210
),
'B02': redis.Redis(
host="222.186.10.253",
port=6379,
password="Obscura@2024",
db=211
)
}
# 新增:初始化时加载已处理的视频记录
self.processed_videos = self._load_processed_videos()
# 新增:异常视频记录
self.error_videos = []
self.error_log_file = "video_processing_errors.log"
# 新增:异常视频缓存集合
self.error_video_cache = set()
def _load_processed_videos(self):
"""从Redis加载所有已处理的视频文件名"""
processed_videos = set()
try:
for camera_id, redis_client in self.redis_clients.items():
# 获取所有小时级别的键
for key in redis_client.keys('*'):
key_str = key.decode('utf-8')
# 只获取键中存储的文件名列表,而不是完整的处理结果
data = redis_client.get(key)
if data:
hour_results = json.loads(data)
# 只添加文件名到集合中
processed_videos.update(hour_results.keys())
print(f"已从Redis加载 {len(processed_videos)} 个已处理文件记录")
return processed_videos
except Exception as e:
print(f"加载Redis处理记录时出错: {str(e)}")
return set()
def _get_redis_key(self, video_path):
try:
# 从路径获取摄像头ID (目录名)
dir_name = os.path.basename(os.path.dirname(video_path))
file_name = os.path.basename(video_path) # 例如:A01_20250105_134104.avi
# 从视频文件名中提取日期和时间
match = re.search(r'(\w+)_(\d{8})_(\d{2})\d{4}\.avi', file_name)
if match:
camera_id = match.group(1) # A01
date = match.group(2) # 20250105
hour = match.group(3) # 13 (从134104中提取)
# 生成正确的key: A01_20250105_1300
redis_key = f"{camera_id}_{date}_{hour}00"
return redis_key
print(f"文件名格式不匹配: {file_name}")
return None
except Exception as e:
print(f"生成Redis key失败: {str(e)}")
return None
def _is_processed(self, video_path):
"""检查视频是否已处理"""
file_name = os.path.basename(video_path)
return file_name in self.processed_videos
def _is_error_cached(self, video_path):
"""检查视频是否已在异常缓存中"""
return video_path in self.error_video_cache
def _add_to_error_cache(self, video_path):
"""添加视频到异常缓存"""
self.error_video_cache.add(video_path)
def _log_error(self, video_path, error_type, error_message):
"""记录视频处理错误"""
# 如果已经在异常缓存中,不再重复记录
if self._is_error_cached(video_path):
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
error_info = {
"timestamp": current_time,
"video_path": video_path,
"error_type": error_type,
"error_message": error_message,
"file_size": os.path.getsize(video_path) if os.path.exists(video_path) else 0
}
self.error_videos.append(error_info)
# 添加到异常缓存
self._add_to_error_cache(video_path)
def _save_error_log(self):
"""保存错误日志到文件"""
if not self.error_videos:
return
try:
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"video_errors_{current_time}.json"
with open(log_filename, 'w', encoding='utf-8') as f:
json.dump(self.error_videos, f, ensure_ascii=False, indent=2)
print(f"\n异常视频记录已保存到: {log_filename}")
# 清空错误记录
self.error_videos = []
except Exception as e:
print(f"保存错误日志失败: {str(e)}")
def process_new_video(self, video_path):
try:
# 如果视频已在异常缓存中,直接跳过
if self._is_error_cached(video_path):
return False
# 处理前清理
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
file_name = os.path.basename(video_path)
# 检查是否已处理
if self._is_processed(video_path):
print(f"视频已处理过,跳过: {file_name}")
return True
# 获取camera_id和时间戳
dir_name = os.path.basename(os.path.dirname(video_path))
file_name = os.path.basename(video_path)
# 使用_get_redis_key获取正确的key
redis_key = self._get_redis_key(video_path)
if not redis_key:
self._log_error(video_path, "Redis Key Error", "无法生成Redis key")
print(f"无法生成Redis key,跳过处理: {file_name}")
return False
# 添加视频文件检查
if not os.path.exists(video_path):
self._log_error(video_path, "File Not Found", "视频文件不存在")
print(f"警告:视频文件不存在,跳过处理: {video_path}")
return False
# 检查文件大小
file_size = os.path.getsize(video_path)
if file_size == 0:
self._log_error(video_path, "Empty File", "视频文件大小为0")
print(f"警告:视频文件大小为0,跳过处理: {video_path}")
return False
elif file_size < 1024 * 1024: # 小于100KB的文件
self._log_error(video_path, "Small File", f"视频文件大小异常({file_size/1024:.2f}KB")
print(f"警告:视频文件大小异常({file_size/1024:.2f}KB),可能不完整,跳过处理: {video_path}")
return False
# 处理视频
try:
# 先尝试打开视频文件验证其完整性
try:
vr = VideoReader(video_path)
total_frames = len(vr)
if total_frames < 1:
self._log_error(video_path, "No Frames", "视频帧数为0")
print(f"警告:视频帧数为0,文件可能损坏: {video_path}")
return False
elif total_frames < 100: # 添加新的判断条件
self._log_error(video_path, "Insufficient Frames", f"视频帧数不足({total_frames}帧)")
print(f"警告:视频帧数不足({total_frames}帧),跳过处理: {video_path}")
return False
print(f"视频信息 - 大小: {file_size/1024/1024:.2f}MB, 总帧数: {total_frames}")
except Exception as e:
self._log_error(video_path, "Video Open Error", str(e))
print(f"警告:视频文件无法正确打开,可能已损坏: {video_path}")
print(f"错误详情: {str(e)}")
return False
with open(video_path, "rb") as f:
video_data = f.read()
try:
qwen_result = self.system.process_with_qwen(video_data, file_name, media_type='video')
except Exception as e:
self._log_error(video_path, "Processing Error", str(e))
print(f"处理视频内容失败,可能是损坏的视频文件: {file_name}")
print(f"错误详情: {str(e)}")
return False
# 从文件名提取时间戳
timestamp_match = re.search(r'(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})', file_name)
if timestamp_match:
year, month, day, hour, minute, second = timestamp_match.groups()
# 构建正确的时间戳格式 (YYYY-MM-DD HH:MM:SS)
timestamp = f"{year}-{month}-{day} {hour}:{minute}:{second}"
else:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
result = {
"video_analysis": {
"qwen-7B": {
"original_answer": qwen_result["original_answer"],
"extracted_info": qwen_result["extracted_info"]
}
},
"timestamp": timestamp # 使用从文件名提取的时间戳
}
# 保存到对应的Redis数据库
if dir_name in self.redis_clients:
redis_client = self.redis_clients[dir_name]
# 获取现有的小时数据(如果存在)
existing_data = redis_client.get(redis_key)
if existing_data:
hour_results = json.loads(existing_data)
hour_results[file_name] = result
else:
hour_results = {file_name: result}
# 保存更新后的数据
json_str = json.dumps(hour_results, ensure_ascii=False)
redis_client.set(redis_key, json_str)
print(f"成功保存到Redis,使用的key: {redis_key}") # 调试信息
# 处理完成后,更新内存中的记录
self.processed_videos.add(file_name)
except Exception as e:
self._log_error(video_path, "File Read Error", str(e))
print(f"读取视频文件失败: {str(e)}")
return False
except Exception as e:
self._log_error(video_path, "General Error", str(e))
print(f"处理视频时发生错误 {video_path}: {str(e)}")
return False
finally:
# 确保内存清理总是执行
if torch.cuda.is_available():
try:
torch.cuda.empty_cache()
gc.collect()
except Exception as e:
print(f"清理GPU内存时发生错误: {str(e)}")
return True
def process_existing_videos(self):
"""处理目录中现有的视频文件"""
videos_found = False
videos_processed = False # 新增标志,用于跟踪是否实际处理了视频
for camera_dir in os.listdir(self.recordings_path):
camera_path = os.path.join(self.recordings_path, camera_dir)
if not os.path.isdir(camera_path):
continue
# 获取所有.avi文件并按时间排序
video_files = []
for video_file in os.listdir(camera_path):
if video_file.endswith('.avi'):
video_path = os.path.join(camera_path, video_file)
video_files.append((video_path, os.path.getmtime(video_path)))
if video_files:
videos_found = True
# 按修改时间排序
video_files.sort(key=lambda x: x[1])
for video_path, _ in video_files:
if not self._is_processed(video_path):
print(f"处理现有视频: {video_path}")
self.process_new_video(video_path)
videos_processed = True # 标记已处理视频
# 只有当找到视频并且实际处理了视频时才返回True
return videos_found and videos_processed
def monitor_directories(self):
"""监控目录变化"""
try:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"开始监控目录: {self.recordings_path} [{current_time}]")
while True:
try:
# 首先处理现有视频
for camera_dir in os.listdir(self.recordings_path):
camera_path = os.path.join(self.recordings_path, camera_dir)
if not os.path.isdir(camera_path):
continue
for video_file in os.listdir(camera_path):
if not video_file.endswith('.avi'):
continue
video_path = os.path.join(camera_path, video_file)
# 检查是否已处理或已在错误缓存中
if not self._is_processed(video_path) and not self._is_error_cached(video_path):
print(f"处理视频: {video_path}")
if not self.process_new_video(video_path):
# 处理失败时,确保添加到错误缓存
self._add_to_error_cache(video_path)
print(f"视频处理失败,已加入错误缓存: {video_path}")
continue
# 添加状态提示
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] 等待新视频中...")
# 休眠一段时间再检查
time.sleep(120)
except Exception as e:
print(f"监控过程出错: {str(e)}")
time.sleep(30) # 出错后等待30秒再继续
except KeyboardInterrupt:
print("\n检测到程序终止信号,正在保存错误日志...")
self._save_error_log()
print("程序已安全终止。")
except Exception as e:
print(f"\n程序异常终止: {str(e)}")
self._save_error_log()
raise
def main():
try:
system = MediaAnalysisSystem()
recordings_path = "recordings" # 设置recordings目录路径
# 创建并启动监控器
monitor = VideoMonitor(recordings_path, system)
monitor.monitor_directories()
except Exception as e:
print(f"\n未预期的错误: {str(e)}")
# 添加:在异常终止时保存错误日志
if 'monitor' in locals():
monitor._save_error_log()
print("错误日志已保存")
raise # 重新抛出异常以保持原有的错误追踪信息
if __name__ == "__main__":
main()