import os import time from copy import deepcopy import numpy as np from PIL import Image, ImageDraw, ImageFont from AcrossPlatform.get_platform import GLOBAL_DIR from MCamera.mp_camera import * from Speaker import speak_base from Speaker.speak_base import beep from Database.manager_database import NAME from realtime_streaming import img_format_jpg def base_detect_image(camera: Camera): _map = MediapipeAlgorithmPlugin() while True: _img = deepcopy(camera.get_frame()) img = _map.find_pose_with_drawing(_img, True) yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + img_format_jpg(img) + b'\r\n' class BaseExercise(threading.Thread): def __init__(self, info, statistic_time=120, camera: Camera = None): super().__init__(daemon=True) # 初始化窗体设置 self.detector = MediapipeAlgorithmPlugin() # 摄像头对象 if not camera: self.cap = Camera() else: self.cap = camera # 个数统计 self.count = 0 # 当前状态 self.dir = 0 # 统计时间 self.statistic_time = statistic_time # 开始时间 self.start_time = time.time() # 開始計算時間 self.start_cal_time = time.time() # 强制结束标志 self.kill_sign = False # 当前画面 self.img = None # 记录是否开始 self.is_start = False # 获得人员信息 self.info = info # 设置摄像头长时间不关闭 Camera.AUTO_CLOSE_TIMEOUT = float('inf') self.exercise_type = "" self.bright = -5 self.speak_driver = speak_base.SpeakServer() self.corner = (0, 480, 0, 640) self.config = (True, 1, True, False, True, 0.5, 0.5) def start(self) -> None: super().start() self.start_cal_time = time.time() def get_info(self): return None def is_done(self): if time.time() - self.start_time > self.statistic_time or self.kill_sign: Camera.stop_record() return True else: return False def get_result(self): result = {} return result # 判断作训是否有效 def is_valid(self): return True # 作训回传文字 @staticmethod def valid_mes(): return "" def tran_count(self): return self.count def speak_counting(self, counting_times, name): self.speak_driver.start() self.speak_driver.speed_control(200) self.speak_driver.volume_control(1) self.speak_driver.add_speak("考试人员{}".format(name)) self.speak_driver.add_speak(f"考试项目{self.exercise_type}") self.speak_driver.add_speak(f"倒计时!") retail_counting = counting_times counting = 0 start_time = time.time() while True: this_time = time.time() if this_time - start_time > counting: if retail_counting > 0: self.speak_driver.add_speak(f"{retail_counting}!") self.speak_driver.wait_4_speak() counting += 1 retail_counting -= 1 else: break threading.Thread(target=beep, daemon=True).start() # 作訓計算數據流 def analysis(self, frame): pass def thread_counting_streaming(self): self.cap.clear_cache() # start_time = time.time() # count = 0 while not self.is_done(): frame = self.cap.get_cache() if frame: # catch_time = frame[CATCH_TIME] self.analysis(frame=frame) # count += 1 # if catch_time - start_time > 1: # print(count, time.time() - catch_time, catch_time - start_time) # start_time = catch_time # count = 0 # 作訓視頻流 def display(self, img): return img def run(self) -> None: self.speak_counting(5, self.info[NAME]) # 清除历史记录 # 记录开始时间 self.start_time = time.time() self.is_start = True # 不停更新计算和更新画面 threading.Thread(target=self.thread_counting_streaming, daemon=True).start() while not self.is_done(): _img = self.cap.get_frame() self.img = self.display(_img) self.is_start = False def skeleton_video(self): _img = deepcopy(self.cap.get_frame()) self.img = self.detector.find_pose_with_drawing(_img, True) return self.img def pure_streaming_gen(self): if not self.kill_sign: if self.is_start and self.img is not None: return self.img else: return self.skeleton_video() def pure_video_gen(self): if not self.kill_sign: self.img = cv2.rectangle( self.skeleton_video(), (self.corner[2], self.corner[0]), (self.corner[3], self.corner[1]), (0, 255, 0), 2 ) return self.img def streaming_gen(self): while not self.kill_sign: if self.is_start and self.img is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + img_format_jpg(self.img) + b'\r\n') else: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + img_format_jpg(self.skeleton_video()) + b'\r\n') def video_gen(self): while not self.kill_sign: self.img = cv2.rectangle( self.skeleton_video(), (self.corner[2], self.corner[0]), (self.corner[3], self.corner[1]), (0, 255, 0), 2 ) yield ( b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + img_format_jpg(self.img) + b'\r\n' ) def waiting_for_start(self): while not self.is_start: time.sleep(0.001) def kill(self): self.kill_sign = True self.speak_driver.stop() while self.is_alive(): time.sleep(0.1) @staticmethod def cv2_img_add_text(img, text, left, top, text_color=(0, 255, 0), text_size=30): if isinstance(img, np.ndarray): # 判断是否OpenCV图片类型 img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) # 创建一个可以在给定图像上绘图的对象 draw = ImageDraw.Draw(img, None) # 字体的格式 font_type_dir = os.path.join(GLOBAL_DIR, "Exercise3/simsun.ttc") fontstyle = ImageFont.truetype(font_type_dir, text_size, encoding="utf-8") # 绘制文本 draw.text((left, top), text, text_color, font=fontstyle) # 转换回OpenCV格式 return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)