import time from copy import deepcopy from threading import Lock from threading import Thread from MCamera.mp_algorithm import MediapipeAlgorithmPlugin from MCamera.mp_camera import MP_RESULT from MCamera.camera import * from Speaker.speak_base import beep from Database.manager_database import * from score_doc import get_fin_score from .base_exercise import BaseExercise class Runaround(BaseExercise): def __init__(self, info, statistic_time=120, camera=None): super().__init__(info, statistic_time, camera=camera) # 计时 self.countdown_flag = threading.Event() # 计时线程 self.costtime = 0 self.countdown = 0 # 当前状态 self.direction = 0 # 动作持续帧数 self.time = 0 # 开始标志 self.form = 0 # 状态反馈 self.feedback = "请开始" self.had_leaf = False self.had_start = False self.had_done = False self.had_countdown = False self.countdown_signal = True self.pre_frame = None self.retail_counting = 5 self.preempt_lock = Lock() self.Preempt = 0 self.pretime = 0 self.racetime = 0 self.starttime = 0 self.endtime = 0 self.fix = 0 self.timefix = 0 self.catch_time = 0 self.exercise_type = "蛇形跑" # 蛇形跑参数 self.corner = (0, 480, 200, 400) MediapipeAlgorithmPlugin.set_corner(corner=self.corner) MediapipeAlgorithmPlugin.set_config(config=self.config) def get_result(self): # 人员年龄 age = self.info.get(AGE) # 人员性别 gender = "woman" if self.info.get(GENDER) == "女" else "man" # if self.costtime == '未完成': if self.costtime < 0: score = 0 else: score = get_fin_score.Military(gender, int(age), serpentine60RunResult=round(self.costtime, 1)).Serpentine60RunScoreEvaluation() result = { "count": self.costtime, "score": score, "countdown": self.countdown, "had_done": self.had_done, 'preempt': self.Preempt } return result def get_info(self): self.preempt_lock.acquire() result = self.Preempt self.preempt_lock.release() return result def is_valid(self): return True def _speak_counting(self): threading.Thread(target=self.count_down).start() counting = 0 first_time = self.catch_time while True: this_time = self.catch_time if this_time - first_time > counting: if not self.countdown_signal and not self.had_countdown: self.speak_driver.add_speak(f"抢跑!") self.pretime = self.catch_time self.countdown_flag.set() self.had_countdown = True if self.retail_counting > 0: if self.countdown_signal: self.speak_driver.speed_control(350) self.speak_driver.add_speak(f"{self.retail_counting}") self.speak_driver.wait_4_speak() counting += 1 self.retail_counting -= 1 else: if self.countdown_signal: threading.Thread(target=beep, args=(300, 640), daemon=True).start() self.had_countdown = True self.countdown_flag.set() self.pretime = self.catch_time break self.had_start = True self.starttime = self.catch_time def count_down(self): while True: self.countdown_flag.wait() self.costtime = self.catch_time - self.pretime self.countdown = int(self.costtime) time.sleep(0.1) def run(self) -> None: self.speak_driver.add_speak("考试人员{}".format(self.info[NAME])) self.speak_driver.add_speak(f"考试项目{self.exercise_type}") self.speak_driver.wait_4_speak() self.cap.clear_cache() # 记录开始时间 self.start_time = time.time() self.is_start = True # 不停更新计算和更新画面 threading.Thread(target=self.thread_counting_streaming, daemon=True).start() while not self.is_done(): _img = deepcopy(self.cap.get_frame()) self.img = self.display(_img) self.is_start = False def display(self, img): cv2.rectangle(img, (self.corner[2], self.corner[0]), (self.corner[3], self.corner[1]), (0, 255, 0), 2) if not self.had_countdown: cv2.putText(img, str(self.retail_counting), (230, 320), cv2.FONT_HERSHEY_PLAIN, 20, (255, 255, 255), 8) return img def analysis(self, frame): self.catch_time = frame[CATCH_TIME] if self.catch_time < self.start_cal_time: return img = frame[FRAME_DAT] lm_list = frame[MP_RESULT] img_input = img[self.corner[0]:self.corner[1], self.corner[2]:self.corner[3]] if self.timefix == 0: self.timefix = 1 Thread(target=self._speak_counting, daemon=True).start() if not self.had_countdown: self.detector.set_result(lm_list) # self.starttime = time.time() if len(lm_list) == 0: self.time = self.time + 1 if self.fix == 0 and self.time > 3: self.fix += 1 self.countdown_signal = False self.preempt_lock.acquire() self.racetime = self.catch_time self.preempt_lock.release() self.valid_mes() self.is_valid() # 如果没有背景图像就将当前帧当作背景图片 else: # 计算抢跑时间 if self.starttime - self.racetime <= 5: self.Preempt = round(self.starttime - self.racetime, 3) elif 10 > self.starttime - self.racetime > 5: self.Preempt = 5 else: self.Preempt = 0 if self.had_start and not self.had_leaf and not self.had_done: # 一直等待到目标完全离开检测区 if self.catch_time - self.pretime > 14: self.had_leaf = True else: # 转灰度图 gray_pic = cv2.cvtColor(img_input, cv2.COLOR_BGR2GRAY) # 用高斯滤波进行模糊处理 gray_pic = cv2.GaussianBlur(gray_pic, (21, 21), 0) if self.pre_frame is None: self.pre_frame = gray_pic else: # absdiff把两幅图的差的绝对值输出到另一幅图上面来 img_delta = cv2.absdiff(self.pre_frame, gray_pic) # threshold阈值函数(原图像应该是灰度图,对像素值进行分类的阈值,当像素值高于(有时是小于)阈值时应该被赋予的新的像素值,阈值方法) thresh = cv2.threshold(img_delta, 30, 255, cv2.THRESH_BINARY)[1] # 用一下腐蚀与膨胀 thresh = cv2.dilate(thresh, None, iterations=2) # findContours检测物体轮廓(寻找轮廓的图像,轮廓的检索模式,轮廓的近似办法) contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for c in contours: # 设置敏感度 if cv2.contourArea(c) < 7500: continue else: if self.had_start and self.had_leaf and not self.had_done: self.endtime = self.catch_time self.had_done = True self.costtime = round(self.endtime - self.starttime + self.Preempt, 3) self.countdown_flag.clear() # print("总耗时 = {}".format(self.costtime)) self.speak_driver.speed_control(200) Thread( target=self.speak_driver.add_speak, args=(f"抢跑时间为{round(self.Preempt, 2)}秒,总耗时为{round(self.costtime, 2)}秒",) ).start() break self.pre_frame = gray_pic