111 lines
4.0 KiB
Python
111 lines
4.0 KiB
Python
import os
|
|
import time
|
|
from PIL import Image
|
|
import torch
|
|
from ultralytics import YOLO
|
|
|
|
class PoseMonitor:
|
|
def __init__(self, images_path):
|
|
self.images_path = images_path
|
|
self.crop_path = "/home/zydi/VLM/crop"
|
|
self.model = YOLO('/home/zydi/models/yolo11n-pose.pt') # 加载YOLOv8-pose模型
|
|
|
|
# 确保crop目录存在
|
|
if not os.path.exists(self.crop_path):
|
|
os.makedirs(self.crop_path)
|
|
|
|
def _process_image(self, image_path):
|
|
"""处理单张图片,检测人体并保存裁剪结果"""
|
|
try:
|
|
# 获取对应的保存路径
|
|
rel_path = os.path.relpath(image_path, self.images_path)
|
|
camera_dir = os.path.dirname(rel_path)
|
|
crop_dir = os.path.join(self.crop_path, camera_dir)
|
|
|
|
# 确保对应的相机目录存在
|
|
if not os.path.exists(crop_dir):
|
|
os.makedirs(crop_dir)
|
|
|
|
# 构建基础文件名
|
|
base_name = os.path.splitext(os.path.basename(image_path))[0]
|
|
|
|
# 使用YOLOPose进行检测
|
|
results = self.model(image_path)
|
|
|
|
# 获取原始图片
|
|
original_image = Image.open(image_path)
|
|
|
|
# 处理每个检测到的人体
|
|
for idx, box in enumerate(results[0].boxes.xyxy):
|
|
x1, y1, x2, y2 = box.tolist()
|
|
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
|
|
|
|
# 裁剪人体区域
|
|
cropped = original_image.crop((x1, y1, x2, y2))
|
|
|
|
# 构建保存路径(添加后缀)
|
|
if idx == 0:
|
|
save_name = f"{base_name}_0.jpg"
|
|
else:
|
|
save_name = f"{base_name}_{idx}.jpg"
|
|
|
|
save_path = os.path.join(crop_dir, save_name)
|
|
|
|
# 保存裁剪后的图片
|
|
cropped.save(save_path)
|
|
print(f"已保存裁剪图片: {save_path}")
|
|
|
|
except Exception as e:
|
|
print(f"处理图片失败 {image_path}: {str(e)}")
|
|
|
|
def monitor_directories(self):
|
|
"""监控目录变化"""
|
|
try:
|
|
print(f"开始监控目录: {self.images_path}")
|
|
processed_files = set()
|
|
|
|
while True:
|
|
try:
|
|
# 处理所有图片文件
|
|
for camera_dir in os.listdir(self.images_path):
|
|
camera_path = os.path.join(self.images_path, camera_dir)
|
|
if not os.path.isdir(camera_path):
|
|
continue
|
|
|
|
for image_file in os.listdir(camera_path):
|
|
if not image_file.lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
continue
|
|
|
|
image_path = os.path.join(camera_path, image_file)
|
|
|
|
# 如果文件已处理过,则跳过
|
|
if image_path in processed_files:
|
|
continue
|
|
|
|
self._process_image(image_path)
|
|
processed_files.add(image_path)
|
|
|
|
print("等待新图片文件...")
|
|
time.sleep(10) # 每10秒检查一次
|
|
|
|
except Exception as e:
|
|
print(f"监控过程出错: {str(e)}")
|
|
time.sleep(10)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n程序已终止")
|
|
except Exception as e:
|
|
print(f"\n程序异常终止: {str(e)}")
|
|
raise
|
|
|
|
def main():
|
|
try:
|
|
images_path = "images"
|
|
monitor = PoseMonitor(images_path)
|
|
monitor.monitor_directories()
|
|
|
|
except Exception as e:
|
|
print(f"\n未预期的错误: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |