Files
2025-01-23 09:05:45 +00:00

112 lines
4.0 KiB
Python

import os
import time
from PIL import Image
import torch
from ultralytics import YOLO
from config import PATH_CONFIG, MODEL_CONFIG
class PoseMonitor:
def __init__(self, images_path):
self.images_path = images_path
self.crop_path = PATH_CONFIG['crop']
self.model = YOLO(MODEL_CONFIG['yolo_pose_path'])
# 确保crop目录存在
if not os.path.exists(self.crop_path):
os.makedirs(self.crop_path)
def _process_image(self, image_path):
"""处理单张图片,检测人体并保存裁剪结果"""
try:
# 获取对应的保存路径
rel_path = os.path.relpath(image_path, self.images_path)
camera_dir = os.path.dirname(rel_path)
crop_dir = os.path.join(self.crop_path, camera_dir)
# 确保对应的相机目录存在
if not os.path.exists(crop_dir):
os.makedirs(crop_dir)
# 构建基础文件名
base_name = os.path.splitext(os.path.basename(image_path))[0]
# 使用YOLOPose进行检测
results = self.model(image_path)
# 获取原始图片
original_image = Image.open(image_path)
# 处理每个检测到的人体
for idx, box in enumerate(results[0].boxes.xyxy):
x1, y1, x2, y2 = box.tolist()
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
# 裁剪人体区域
cropped = original_image.crop((x1, y1, x2, y2))
# 构建保存路径(添加后缀)
if idx == 0:
save_name = f"{base_name}_0.jpg"
else:
save_name = f"{base_name}_{idx}.jpg"
save_path = os.path.join(crop_dir, save_name)
# 保存裁剪后的图片
cropped.save(save_path)
print(f"已保存裁剪图片: {save_path}")
except Exception as e:
print(f"处理图片失败 {image_path}: {str(e)}")
def monitor_directories(self):
"""监控目录变化"""
try:
print(f"开始监控目录: {self.images_path}")
processed_files = set()
while True:
try:
# 处理所有图片文件
for camera_dir in os.listdir(self.images_path):
camera_path = os.path.join(self.images_path, camera_dir)
if not os.path.isdir(camera_path):
continue
for image_file in os.listdir(camera_path):
if not image_file.lower().endswith(('.jpg', '.jpeg', '.png')):
continue
image_path = os.path.join(camera_path, image_file)
# 如果文件已处理过,则跳过
if image_path in processed_files:
continue
self._process_image(image_path)
processed_files.add(image_path)
print("等待新图片文件...")
time.sleep(10) # 每10秒检查一次
except Exception as e:
print(f"监控过程出错: {str(e)}")
time.sleep(10)
except KeyboardInterrupt:
print("\n程序已终止")
except Exception as e:
print(f"\n程序异常终止: {str(e)}")
raise
def main():
try:
images_path = PATH_CONFIG['images']
monitor = PoseMonitor(images_path)
monitor.monitor_directories()
except Exception as e:
print(f"\n未预期的错误: {str(e)}")
if __name__ == "__main__":
main()