LISHUZUOXUN_yangjiang/Exercise_mn/run_around.py

212 lines
8.7 KiB
Python

import time
from copy import deepcopy
from threading import Lock
from threading import Thread
from MCamera.mn_algorithm import MoveNetAlgorithmPlugin
from MCamera.mn_camera import MN_RESULT
from MCamera.mp_camera import MP_RESULT
from MCamera.camera import *
from Speaker.speak_base import beep
from Database.manager_database import *
from score_doc import get_fin_score
from Exercise_mn.base_exercise import BaseExercise
class Runaround(BaseExercise):
def __init__(self, info, statistic_time=120, camera=None):
super().__init__(info, statistic_time, camera=camera)
# 计时
self.countdown_flag = threading.Event() # 计时线程
self.costtime = 0
self.countdown = 0
# 当前状态
self.direction = 0
# 动作持续帧数
self.time = 0
# 开始标志
self.form = 0
# 状态反馈
self.feedback = "请开始"
self.had_leaf = False
self.had_start = False
self.had_done = False
self.had_countdown = False
self.countdown_signal = True
self.pre_frame = None
self.retail_counting = 5
self.preempt_lock = Lock()
self.Preempt = 0
self.pretime = 0
self.racetime = 0
self.starttime = 0
self.endtime = 0
self.fix = 0
self.timefix = 0
self.catch_time = 0
self.exercise_type = "蛇形跑"
# 蛇形跑参数
self.corner = (0, 480, 200, 400)
self.config = (0.7, 50, 55, 60)
MoveNetAlgorithmPlugin.set_corner(corner=self.corner)
MoveNetAlgorithmPlugin.set_config(config=self.config)
def get_result(self):
# 人员年龄
age = self.info.get(AGE)
# 人员性别
gender = "woman" if self.info.get(GENDER) == "" else "man"
# if self.costtime == '未完成':
if self.costtime < 0:
score = 0
else:
score = get_fin_score.Military(gender, int(age),
serpentine60RunResult=round(self.costtime,
1)).Serpentine60RunScoreEvaluation()
result = {
"count": self.costtime,
"score": score,
"countdown": self.countdown,
"had_done": self.had_done,
'preempt': self.Preempt
}
return result
def get_info(self):
self.preempt_lock.acquire()
result = self.Preempt
self.preempt_lock.release()
return result
def is_valid(self):
return True
def _speak_counting(self):
threading.Thread(target=self.count_down).start()
counting = 0
first_time = self.catch_time
while True:
this_time = self.catch_time
if this_time - first_time > counting:
if not self.countdown_signal and not self.had_countdown:
self.speak_driver.add_speak(f"抢跑!")
self.pretime = self.catch_time
self.countdown_flag.set()
self.had_countdown = True
if self.retail_counting > 0:
if self.countdown_signal:
self.speak_driver.speed_control(350)
self.speak_driver.add_speak(f"{self.retail_counting}")
self.speak_driver.wait_4_speak()
counting += 1
self.retail_counting -= 1
else:
if self.countdown_signal:
threading.Thread(target=beep, args=(300, 640), daemon=True).start()
self.had_countdown = True
self.countdown_flag.set()
self.pretime = self.catch_time
break
self.had_start = True
self.starttime = self.catch_time
def count_down(self):
while True:
self.countdown_flag.wait()
self.costtime = self.catch_time - self.pretime
self.countdown = int(self.costtime)
time.sleep(0.1)
def run(self) -> None:
self.speak_driver.add_speak("考试人员{}".format(self.info[NAME]))
self.speak_driver.add_speak(f"考试项目{self.exercise_type}")
self.speak_driver.wait_4_speak()
self.cap.clear_cache()
# 记录开始时间
self.start_time = time.time()
self.is_start = True
# 不停更新计算和更新画面
threading.Thread(target=self.thread_counting_streaming, daemon=True).start()
while not self.is_done():
_img = deepcopy(self.cap.get_frame())
self.img = self.display(_img)
self.is_start = False
def display(self, img):
cv2.rectangle(img, (self.corner[2], self.corner[0]), (self.corner[3], self.corner[1]), (0, 255, 0), 2)
if not self.had_countdown:
cv2.putText(img, str(self.retail_counting), (230, 320), cv2.FONT_HERSHEY_PLAIN, 20, (255, 255, 255), 8)
return img
def analysis(self, frame):
self.catch_time = frame[CATCH_TIME]
if self.catch_time < self.start_cal_time:
return
img = frame[FRAME_DAT]
lm_list = frame[MN_RESULT]
img_input = img[self.corner[0]:self.corner[1], self.corner[2]:self.corner[3]]
if self.timefix == 0:
self.timefix = 1
Thread(target=self._speak_counting, daemon=True).start()
if not self.had_countdown:
self.detector.set_result(lm_list)
# self.starttime = time.time()
if len(lm_list) == 0:
self.time = self.time + 1
if self.fix == 0 and self.time > 3:
self.fix += 1
self.countdown_signal = False
self.preempt_lock.acquire()
self.racetime = self.catch_time
self.preempt_lock.release()
self.valid_mes()
self.is_valid()
# 如果没有背景图像就将当前帧当作背景图片
else:
# 计算抢跑时间
if self.starttime - self.racetime <= 5:
self.Preempt = round(self.starttime - self.racetime, 3)
elif 10 > self.starttime - self.racetime > 5:
self.Preempt = 5
else:
self.Preempt = 0
if self.had_start and not self.had_leaf and not self.had_done:
# 一直等待到目标完全离开检测区
if self.catch_time - self.pretime > 14:
self.had_leaf = True
else:
# 转灰度图
gray_pic = cv2.cvtColor(img_input, cv2.COLOR_BGR2GRAY)
# 用高斯滤波进行模糊处理
gray_pic = cv2.GaussianBlur(gray_pic, (21, 21), 0)
if self.pre_frame is None:
self.pre_frame = gray_pic
else:
# absdiff把两幅图的差的绝对值输出到另一幅图上面来
img_delta = cv2.absdiff(self.pre_frame, gray_pic)
# threshold阈值函数(原图像应该是灰度图,对像素值进行分类的阈值,当像素值高于(有时是小于)阈值时应该被赋予的新的像素值,阈值方法)
thresh = cv2.threshold(img_delta, 30, 255, cv2.THRESH_BINARY)[1]
# 用一下腐蚀与膨胀
thresh = cv2.dilate(thresh, None, iterations=2)
# findContours检测物体轮廓(寻找轮廓的图像,轮廓的检索模式,轮廓的近似办法)
contours, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for c in contours:
# 设置敏感度
if cv2.contourArea(c) < 7500:
continue
else:
if self.had_start and self.had_leaf and not self.had_done:
self.endtime = self.catch_time
self.had_done = True
self.costtime = round(self.endtime - self.starttime + self.Preempt, 3)
self.countdown_flag.clear()
# print("总耗时 = {}".format(self.costtime))
self.speak_driver.speed_control(200)
Thread(
target=self.speak_driver.add_speak,
args=(f"抢跑时间为{round(self.Preempt, 2)}秒,总耗时为{round(self.costtime, 2)}",)
).start()
break
self.pre_frame = gray_pic