import queue import time import traceback from copy import copy from multiprocessing import Process, Queue from MCamera.camera import * from MCamera.mn_algorithm import MoveNetAlgorithmPlugin MN_RESULT = "mn_result" CORNER = "corner" CONFIG = "config" CROPPED_FRAME = "crp_frame" STREAM_CORE_NUMBER = 4 class Camera(Camera): frame_arrival = threading.Event() frame_arrival.clear() cache_arrival = threading.Event() cache_arrival.clear() to_be_proc_queue_list = [Queue() for _ in range(STREAM_CORE_NUMBER)] processed_queue = Queue() processing_mp_started = False FORGET_TIME = 1 def __init__(self): super().__init__() if not Camera.processing_mp_started: # 启动4个计算流核心 for i in range(STREAM_CORE_NUMBER): Process( target=self._processing_mp_pose_detect, args=(Camera.to_be_proc_queue_list[i], Camera.processed_queue), daemon=True ).start() Camera.processing_mp_started = True threading.Thread(target=Camera._threading_data_gather, daemon=True).start() threading.Thread(target=Camera._threading_data_destroy, daemon=True).start() @classmethod def _threading_data_destroy(cls): while True: this_time = time.time() cls.queue_frame = list(filter(lambda x: this_time - x[CATCH_TIME] > 120, cls.queue_frame)) time.sleep(1) @classmethod def _threading_data_gather(cls): # start_time = time.time() # count = 0 while True: try: data = Camera.processed_queue.get() # if time.time() - data[CATCH_TIME] > Camera.FORGET_TIME: # continue cls.queue_frame.append(data) cls.queue_frame.sort(key=lambda x: x[CATCH_TIME]) cls.cache_arrival.set() # count += 1 # this_time = time.time() # print(this_time - data[CATCH_TIME]) # if this_time - start_time > 1: # print(count, this_time - start_time) # start_time = this_time # count = 0 except queue.Empty: return @classmethod def get_cache(cls, timeout=0.1): cls.cache_arrival.wait(timeout) if cls.queue_frame: return cls.queue_frame.pop(0) else: cls.cache_arrival.clear() @classmethod def clear_cache(cls): while True: try: Camera.processed_queue.get_nowait() except queue.Empty: cls.queue_frame.clear() return @classmethod def _thread(cls, video_source): """Camera background thread.""" print(f'Starting CAMERA@{video_source} thread.') frame_no = 0 frames_iterator = cls.frames(video_source) for frame in frames_iterator: if cls.record_signal: fix_time = 1 / Camera.fps - (time.time() - Camera.last_access) if fix_time > 0: # print("sleep:", fix_time) continue try: frame_pkg = { CAMERA_ID: video_source, FRAME_DAT: frame, CATCH_TIME: time.time(), CORNER: MoveNetAlgorithmPlugin.get_corner(), CONFIG: MoveNetAlgorithmPlugin.get_config() } Camera.to_be_proc_queue_list[frame_no].put(frame_pkg) frame_no = 0 if frame_no + 1 == STREAM_CORE_NUMBER else frame_no + 1 except Exception as e: print(e.args) print(traceback.format_exc()) continue Camera.frame[video_source] = frame Camera.event[video_source].set() # send signal to clients @staticmethod def _processing_mp_pose_detect(to_be_processing_queue: Queue, processed_queue: Queue): MoveNetAlgorithmPlugin.class_init() mn_algo = MoveNetAlgorithmPlugin() # start_time = time.time() # count = 0 while True: try: data = to_be_processing_queue.get() # catch_time = data[CATCH_TIME] mn_algo.set_config(config=data[CONFIG]) mn_algo.set_corner(corner=data[CORNER]) lm_list, frame_cropped = mn_algo.find_pose(img=data[FRAME_DAT]) data[MN_RESULT] = lm_list data[CROPPED_FRAME] = frame_cropped # print(time.time() - data[CATCH_TIME]) # count += 1 # this_time = time.time() # if this_time - start_time >= 1: # print(count, this_time - catch_time, this_time - start_time) # start_time = this_time # count = 0 processed_queue.put(data) except Exception as e: print(e.args) print(traceback.format_exc()) break