LISHUZUOXUN_yangjiang/Exercise3/base_exercise.py

222 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
from copy import deepcopy
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from AcrossPlatform.get_platform import GLOBAL_DIR
from MCamera.mp_camera import *
from Speaker import speak_base
from Speaker.speak_base import beep
from Database.manager_database import NAME
from realtime_streaming import img_format_jpg
def base_detect_image(camera: Camera):
_map = MediapipeAlgorithmPlugin()
while True:
_img = deepcopy(camera.get_frame())
img = _map.find_pose_with_drawing(_img, True)
yield b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + img_format_jpg(img) + b'\r\n'
class BaseExercise(threading.Thread):
def __init__(self, info, statistic_time=120, camera: Camera = None):
super().__init__(daemon=True)
# 初始化窗体设置
self.detector = MediapipeAlgorithmPlugin()
# 摄像头对象
if not camera:
self.cap = Camera()
else:
self.cap = camera
# 个数统计
self.count = 0
# 当前状态
self.dir = 0
# 统计时间
self.statistic_time = statistic_time
# 开始时间
self.start_time = time.time()
# 開始計算時間
self.start_cal_time = time.time()
# 强制结束标志
self.kill_sign = False
# 当前画面
self.img = None
# 记录是否开始
self.is_start = False
# 获得人员信息
self.info = info
# 设置摄像头长时间不关闭
Camera.AUTO_CLOSE_TIMEOUT = float('inf')
self.exercise_type = ""
self.bright = -5
self.speak_driver = speak_base.SpeakServer()
self.corner = (0, 480, 0, 640)
self.config = (True, 1, True, False, True, 0.5, 0.5)
def start(self) -> None:
super().start()
self.start_cal_time = time.time()
def get_info(self):
return None
def is_done(self):
if time.time() - self.start_time > self.statistic_time or self.kill_sign:
Camera.stop_record()
return True
else:
return False
def get_result(self):
result = {}
return result
# 判断作训是否有效
def is_valid(self):
return True
# 作训回传文字
@staticmethod
def valid_mes():
return ""
def tran_count(self):
return self.count
def speak_counting(self, counting_times, name):
self.speak_driver.start()
self.speak_driver.speed_control(200)
self.speak_driver.volume_control(1)
self.speak_driver.add_speak("考试人员{}".format(name))
self.speak_driver.add_speak(f"考试项目{self.exercise_type}")
self.speak_driver.add_speak(f"倒计时!")
retail_counting = counting_times
counting = 0
start_time = time.time()
while True:
this_time = time.time()
if this_time - start_time > counting:
if retail_counting > 0:
self.speak_driver.add_speak(f"{retail_counting}")
self.speak_driver.wait_4_speak()
counting += 1
retail_counting -= 1
else:
break
threading.Thread(target=beep, daemon=True).start()
# 作訓計算數據流
def analysis(self, frame):
pass
def thread_counting_streaming(self):
self.cap.clear_cache()
# start_time = time.time()
# count = 0
while not self.is_done():
frame = self.cap.get_cache()
if frame:
# catch_time = frame[CATCH_TIME]
self.analysis(frame=frame)
# count += 1
# if catch_time - start_time > 1:
# print(count, time.time() - catch_time, catch_time - start_time)
# start_time = catch_time
# count = 0
# 作訓視頻流
def display(self, img):
return img
def run(self) -> None:
self.speak_counting(5, self.info[NAME])
# 清除历史记录
# 记录开始时间
self.start_time = time.time()
self.is_start = True
# 不停更新计算和更新画面
threading.Thread(target=self.thread_counting_streaming, daemon=True).start()
while not self.is_done():
_img = self.cap.get_frame()
self.img = self.display(_img)
self.is_start = False
def skeleton_video(self):
_img = deepcopy(self.cap.get_frame())
self.img = self.detector.find_pose_with_drawing(_img, True)
return self.img
def pure_streaming_gen(self):
if not self.kill_sign:
if self.is_start and self.img is not None:
return self.img
else:
return self.skeleton_video()
def pure_video_gen(self):
if not self.kill_sign:
self.img = cv2.rectangle(
self.skeleton_video(),
(self.corner[2], self.corner[0]),
(self.corner[3], self.corner[1]),
(0, 255, 0), 2
)
return self.img
def streaming_gen(self):
while not self.kill_sign:
if self.is_start and self.img is not None:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + img_format_jpg(self.img) + b'\r\n')
else:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + img_format_jpg(self.skeleton_video()) + b'\r\n')
def video_gen(self):
while not self.kill_sign:
self.img = cv2.rectangle(
self.skeleton_video(),
(self.corner[2], self.corner[0]),
(self.corner[3], self.corner[1]),
(0, 255, 0), 2
)
yield (
b'--frame\r\nContent-Type: image/jpeg\r\n\r\n'
+ img_format_jpg(self.img)
+ b'\r\n'
)
def waiting_for_start(self):
while not self.is_start:
time.sleep(0.001)
def kill(self):
self.kill_sign = True
self.speak_driver.stop()
while self.is_alive():
time.sleep(0.1)
@staticmethod
def cv2_img_add_text(img, text, left, top, text_color=(0, 255, 0), text_size=30):
if isinstance(img, np.ndarray): # 判断是否OpenCV图片类型
img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
# 创建一个可以在给定图像上绘图的对象
draw = ImageDraw.Draw(img, None)
# 字体的格式
font_type_dir = os.path.join(GLOBAL_DIR, "Exercise3/simsun.ttc")
fontstyle = ImageFont.truetype(font_type_dir, text_size, encoding="utf-8")
# 绘制文本
draw.text((left, top), text, text_color, font=fontstyle)
# 转换回OpenCV格式
return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)