# coding: gb2312 import copy import json import os import threading import time import traceback from flask import send_file from Database.database_mgr import * from AcrossPlatform.get_platform import GLOBAL_DIR from DeviceDefine.consensus import * from LogRecord.log_recorder import GLOBAL_LOG from PureBackend.base_driver import MODEL_MEDIAPIPE from PureBackend.standard_manager import StandardManager from PureBackend.general import * from Speaker.fake_speak_base import beep video_path = os.path.join(GLOBAL_DIR, "LSZXVideo", "Video") SUMMARY_FILE = os.path.join(GLOBAL_DIR, "summary_info.json") class ManagerDriver(StandardManager): def __init__(self, master_mode=True, positioning=True, camera=True, model=MODEL_MEDIAPIPE, speaker=True, multi_positioning_mode=True, device_type=UNKNOWN, pure_mode=False): super().__init__(master_mode, positioning, camera, model, speaker, multi_positioning_mode, device_type, pure_mode) # 初始化传送管理端的成绩 self.synchronization_info = {} self.statistics_score = {SITUP: [], RUNAROUND: [], PUSHUP: [], PULLUP: [], RUNNING: [], 'total': [], 'top': {'total': {}, SITUP: {}, RUNAROUND: {}, PULLUP: {}, PUSHUP: {}, RUNNING: {}}} self._statistics_score = {} self._statistics_sql_score = {} self._sql_score = {} self.hrbo_info = {} self._hrbo_info = {} self.closed_chamber = {} self.tag_mes = {HR: None, BO: None} if device_type == MANAGER: self.sql_data = self.manager.select_all_score() for i in self.sql_data: i.update({'hr': {'hr': '', 'color': 'W'}, 'bo': {'bo': '', 'color': 'W'}}) self._sql_score.update({i['id']: copy.deepcopy(i)}) for key in i.keys(): if '_count' in key: if i[key]: score = str(i[key]) else: score = '' i[key] = {'count': score, 'is_changed': False} threading.Thread(target=self.processing_sql).start() self.processing_score_signal = threading.Event() threading.Thread(target=self.processing_score).start() # 获取视频列表 def video_list(self, person_id, project): try: response_data = [] for i in os.listdir(video_path): v_name = i.rsplit('.', 1)[0] data = v_name.split('_') if data[0] == person_id and data[2] == project: t = data[4].split('-') date = "{}年{}月{}日{}时{}分".format(t[0], t[1], t[2], t[3], t[4]) response_data.append({'name': i, 'date': date, 'project': data[2], 'score': data[3]}) response_data = sorted(response_data, key=lambda item: (item['date'], item['score']), reverse=True) return response_data except Exception as e: GLOBAL_LOG.write(f"获取视频列表发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获取视频 def video_server(self, file_name): try: video_dir = os.path.join(video_path, file_name) return send_file(video_dir) except Exception as e: GLOBAL_LOG.write(f"获取视频发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 汇总成绩 def totals_synchronization_server(self, cmd): try: # print(cmd) ip = cmd['ip'] cmd['data']['time'] = time.time() if ip not in self._statistics_score: self._statistics_score.update({ip: {}}) self._statistics_score[ip] = cmd['data'] self.processing_score_signal.set() except Exception as e: GLOBAL_LOG.write(f"汇总成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}") # 处理大屏实时成绩 def processing_score(self): while True: try: self.processing_score_signal.wait() self.processing_score_signal.clear() new_statistics_score = {SITUP: [], RUNAROUND: [], PUSHUP: [], PULLUP: [], RUNNING: []} for k, v in self._statistics_score.items(): for key, value in v.items(): if key == RUNNING: new_statistics_score.setdefault(key, []) new_statistics_score[key] = value new_statistics_score[key].sort(key=lambda x: (1 if x['normal'] else 0, x['rank'])) elif key != 'time': new_statistics_score.setdefault(key, []) if value: new_statistics_score[key].append(value) else: pass if key == 'runaround': new_statistics_score[key].sort(key=lambda x: (1 if x['normal'] else 0, x['count'])) else: new_statistics_score[key].sort(key=lambda x: (1 if x['normal'] else 0, -x['count'])) if key != 'time': for i in new_statistics_score[key]: if not i['normal'] and int(i['id']) not in self.closed_chamber: self.hrbo_info.update({int(i['id']): i}) self.statistics_score.update(new_statistics_score) # 5分钟后移出小黑屋 temp = self.closed_chamber.copy() for key, value in temp.items(): this_time = time.time() if this_time - int(value) > 300: del self.closed_chamber[key] except Exception as e: GLOBAL_LOG.write(f"处理大屏实时成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}") # 汇总数据库成绩 def totals_sql_synchronization_server(self, cmd): try: print(cmd) if cmd['data']: exercise_tag = next(iter(cmd['data'])) if exercise_tag != RUNAROUND: record = str(exercise_tag) + '_' + 'count' score = str(exercise_tag) + '_' + 'score' else: record = 'run_bf_count' score = 'run_bf_score' for o in self.sql_data: for key, value in cmd['data'][exercise_tag].items(): if o['id'] == key: _record = self._sql_score[key][record] o[score] = value['score'] s = str(round(value['record'], 2)) if _record: o[record]['count'] = s + '(' + str(_record) + ')' else: o[record]['count'] = s o[record]['is_changed'] = True threading.Thread(target=self.processing_sql).start() except Exception as e: GLOBAL_LOG.write(f"汇总数据库成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}") # 处理数据库成绩 def processing_sql(self): try: response_data = self.sql_data total = [ {'hr': row['hr'], 'bo': row['bo'], 'id': row['id'], 'name': row['name'], SITUP: row['situp_count'], RUNAROUND: row['run_bf_count'], PUSHUP: row['pushup_count'], PULLUP: row['pullup_count'], RUNNING: row['running_count']} for row in response_data] top_score = 0 top_situp = 0 top_pushup = 0 top_pullup = 0 top_runround = 40 top_running = 1800 for i in response_data: if i['pullup_score'] and i['situp_score'] and i['running_score'] and \ i['run_bf_score']: total_score = (float(i['pullup_score']) + float(i['situp_score']) + float(i['running_score']) + float(i['run_bf_score'])) / 4 if total_score > top_score: top_score = total_score self.statistics_score['top']['total'] = {'name': i['name'], 'score': top_score} if i['pullup_count']['count']: if '(' in str(i['pullup_count']['count']): count = float(i['pullup_count']['count'].split('(')[0]) if count > top_pullup: top_pullup = count self.statistics_score['top'][PULLUP] = {'name': i['name'], 'score': top_pullup} else: if float(i['pullup_count']['count']) > top_pullup: top_pullup = float(i['pullup_count']['count']) self.statistics_score['top'][PULLUP] = {'name': i['name'], 'score': top_pullup} if i['pushup_count']['count']: if '(' in str(i['pushup_count']['count']): count = float(i['pushup_count']['count'].split('(')[0]) if count > top_pushup: top_pushup = count self.statistics_score['top'][PUSHUP] = {'name': i['name'], 'score': top_pushup} else: if float(i['pushup_count']['count']) > top_pushup: top_pushup = float(i['pushup_count']['count']) self.statistics_score['top'][PUSHUP] = {'name': i['name'], 'score': top_pushup} if i['situp_count']['count']: if '(' in str(i['pushup_count']['count']): count = float(i['situp_count']['count'].split('(')[0]) if count > top_situp: top_situp = count self.statistics_score['top'][SITUP] = {'name': i['name'], 'score': top_situp} else: if float(i['situp_count']['count']) > top_situp: top_situp = float(i['situp_count']['count']) self.statistics_score['top'][SITUP] = {'name': i['name'], 'score': top_situp} if i['run_bf_count']['count']: if '(' in str(i['run_bf_count']['count']): count = float(i['run_bf_count']['count'].split('(')[0]) if top_runround > count > 0: top_runround = count self.statistics_score['top'][RUNAROUND] = {'name': i['name'], 'score': top_runround} else: if top_runround > float(i['run_bf_count']['count']) > 0: top_runround = float(i['run_bf_count']['count']) self.statistics_score['top'][RUNAROUND] = {'name': i['name'], 'score': top_runround} if i['running_count']['count']: if '(' in str(i['running_count']['count']): count = float(i['running_count']['count'].split('(')[0]) if count < top_running: top_running = count self.statistics_score['top'][RUNNING] = {'name': i['name'], 'score': top_running} else: if float(i['running_count']['count']) < top_running: top_running = float(i['running_count']['count']) self.statistics_score['top'][RUNNING] = {'name': i['name'], 'score': top_running} self.statistics_score['total'] = total except Exception as e: GLOBAL_LOG.write(f"处理数据库成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}") def statistics(self): try: time.sleep(0.5) self.statistics_score.update({STATUS: OK}) temp = self._statistics_score.copy() for k, v in temp.items(): this_time = time.time() if this_time - v['time'] > 60: del self._statistics_score[k] self.processing_score_signal.set() return self.statistics_score except Exception as e: GLOBAL_LOG.write(f"statistics发生错误,{str(e)},错误来源:{traceback.format_exc()}") # 接收心率血氧信息 def totals_HrBoInfo_server(self, cmd): try: data = cmd["data"] # 5分钟后移出小黑屋 temp = self.closed_chamber.copy() for key, value in temp.items(): this_time = time.time() if this_time - int(value) > 300: del self.closed_chamber[key] # 汇总所有人hrbo信息 for k, v in data.items(): self._hrbo_info[k] = v # 筛选出警告人员 temp = self._hrbo_info.copy() for key, value in temp.items(): this_time = time.time() if this_time - value['record_time'] > 120: del self._hrbo_info[key] p_data = self.manager.select_a_person_from_band(key) if p_data: p_id = int(p_data[0]["id"]) p_name = p_data[0]["name"] for i in self.statistics_score['total']: if p_id == int(i[ID]): i['hr'] = value['hr'] i['bo'] = value['bo'] break if not value["normal"] and p_id not in self.closed_chamber: self.hrbo_info.update({p_id: value}) self.hrbo_info[p_id]["name"] = p_name except Exception as e: GLOBAL_LOG.write(f"接收心率血氧信息发生错误,{str(e)},错误来源:{traceback.format_exc()}") # 心率血氧有误 def abnormal(self): report_tag = "" try: time.sleep(0.5) if self.hrbo_info: data = [] for key, value in self.hrbo_info.items(): data.append({"name": value["name"], "id": key, HR: value[HR], BO: value[BO]}) # 演示版 # if int(key) == 34: # data.append({"name": value["name"], "id": key, HR: value[HR]['hr'], BO: value[BO]['bo']}) data = sorted(data, key=lambda item: item['id']) new_report_tag = "@".join([str(item['id']) for item in data]) if report_tag != new_report_tag: beep(duration=1500, freq=640) self.speak_driver.add_speak("体征监测系统警报!") for d in data: self.speak_driver.add_speak(f"{d['name']}体征异常!请尽快救护!") return data except Exception as e: GLOBAL_LOG.write(f"心率血氧有误发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 心率血氧有误回传信号给后端 def receive(self, data): try: self.hrbo_info = {} # 关入小黑屋 for i in data: this_time = time.time() p_id = int(i["id"]) self.closed_chamber.update({p_id: this_time}) return True except Exception as e: GLOBAL_LOG.write(f"心率血氧有误回传信号给后端发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) return False # 汇总分析总览界面 # 获取所有的课目 def get_all_projects(self): try: projects = [] for k, v in EXERCISE_NAME_LIST.items(): projects.append({'value': k, 'label': v}) return projects except Exception as e: GLOBAL_LOG.write(f"获取所有的课目发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获取所有的批次 def get_all_batch(self): try: all_batch = self.manager.get_all_batch() return all_batch except Exception as e: GLOBAL_LOG.write(f"获取所有的批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获取所有的人员 def get_all_people(self): try: all_name = [] for k, v in self.manager.get_all_name().items(): all_name.append({'id': k, 'name': v}) return all_name except Exception as e: GLOBAL_LOG.write(f"获取所有的人员发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获取所有的班级 def get_all_classes(self): try: all_class = self.manager.get_all_class() return all_class except Exception as e: GLOBAL_LOG.write(f"获取所有的班级发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获得课目训练汇总默认的课目和批次 def get_projects_batch(self): try: projects = [] for k, v in self.summary_info['default_projects'].items(): projects.append(v) batch = self.summary_info['default_batch'] return projects, batch except Exception as e: GLOBAL_LOG.write(f"获得课目训练汇总默认的课目和批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 设置课目训练汇总课目和批次 def set_projects_batch(self, projects, batch): try: self.summary_info['default_projects'] = projects self.summary_info['default_batch'] = batch self._update_summary_info() return True except Exception as e: GLOBAL_LOG.write(f"设置课目训练汇总课目和批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) return False # 环形图--总览 def circular_diagram(self, projects, batch): try: projects_list = [] categorize = [] response_data = {} for k, v in projects.items(): projects_list.append(k) response_data.update({k: []}) categorize.append({'name': v, 'keyname': k}) data = self.manager.get_exercise_statistic( type_list=projects_list, batch=batch) for k, v in data.items(): for j in categorize: if k == j['keyname']: for i in GRADE_RANK: percentage = round(v[i]['percentage'] * 100, 2) response_data[k].append(percentage) return categorize, response_data except Exception as e: GLOBAL_LOG.write(f"环形图--总览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 详细成绩分布--总览环形图 def detailed_grade_info_1(self, projects, batch, grade): try: project = None for k, v in projects.items(): project = k data = self.manager.get_exercise_statistic( type_list=[project], batch=batch ) response_data = [ { "id": person_data[ID], "name": person_data[NAME], "score": person_data[SCORE], "count": person_data[RECORD] } for person_data in data[project][grade][DATA] ] return response_data except Exception as e: GLOBAL_LOG.write(f"详细成绩分布--总览环形图发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获得各班课目训练汇总默认的课目和批次 def get_class_projects_batch(self): try: projects = [] for k, v in self.summary_info['default_class_projects'].items(): projects.append(v) batch = self.summary_info['default_class_batch'] return projects, batch except Exception as e: GLOBAL_LOG.write(f"获得各班课目训练汇总默认的课目和批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 设置各班课目训练汇总课目和批次 def set_class_projects_batch(self, projects, batch): try: self.summary_info['default_class_projects'] = projects self.summary_info['default_class_batch'] = batch self._update_summary_info() return True except Exception as e: GLOBAL_LOG.write(f"设置各班课目训练汇总课目和批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 班级条形图--总览 def class_bar_chart(self, projects, batch): try: project = None for k, v in projects.items(): project = k data = self.manager.get_class_statistic( batch=batch, _type=project ) team = [] for k, v in data.items(): team.append(k) team = sorted(team) response_data = [[] for _ in GRADE_RANK] for index, item in enumerate(GRADE_RANK): for _index, _item in enumerate(team): percentage = round(data[_item][item]['percentage'] * 100, 2) response_data[index].append(percentage) return team, response_data except Exception as e: GLOBAL_LOG.write(f"班级条形图--总览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 详细成绩分布--总览班级条形图 def detailed_grade_info_2(self, projects, batch, grade, _class): try: project = None for k, v in projects.items(): project = k data = self.manager.get_class_statistic( batch=batch, _type=project ) response_data = [ { "id": person_data[ID], "name": person_data[NAME], "score": person_data[SCORE], "count": person_data[RECORD] } for person_data in data[_class][grade][DATA] ] return response_data except Exception as e: GLOBAL_LOG.write(f"详细成绩分布--总览班级条形图览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 获得集体课目训练汇总默认的课目 def get_group_projects(self): try: projects = [] for k, v in self.summary_info['default_group_projects'].items(): projects.append(v) return projects except Exception as e: GLOBAL_LOG.write(f"获得集体课目训练汇总默认的课目览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 设置集体课目训练汇总课目 def set_group_projects(self, projects): try: self.summary_info['default_group_projects'] = projects self._update_summary_info() return True except Exception as e: GLOBAL_LOG.write(f"设置集体课目训练汇总课目览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 批次条形图--总览 def batch_bar_chart(self, projects): try: project = None for k, v in projects.items(): project = k data = self.manager.get_group_statistic( _type=project ) all_batch = self.manager.get_all_batch() response_data = [[] for _ in GRADE_RANK] for index, item in enumerate(GRADE_RANK): for _index, _item in enumerate(all_batch): percentage = round(data[_item][item]['percentage'] * 100, 2) response_data[index].append(percentage) return all_batch, response_data except Exception as e: GLOBAL_LOG.write(f"批次条形图--总览发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 详细成绩分布--总览批次条形图 def detailed_grade_info_3(self, projects, batch, grade): try: project = None for k, v in projects.items(): project = k data = self.manager.get_group_statistic( _type=project ) response_data = [ { "id": person_data[ID], "name": person_data[NAME], "score": person_data[SCORE], "count": person_data[RECORD] } for person_data in data[batch][grade][DATA] ] return response_data except Exception as e: GLOBAL_LOG.write(f"详细成绩分布--总览批次条形图发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 汇总分析集体界面 # 获得集体汇总默认的课目、班级、批次 def get_team_projects_batch_class(self): try: projects = None for k, v in self.summary_info['default_team_project'].items(): projects = v batch = self.summary_info['default_team_batch'] team = self.summary_info['default_team_class'] return projects, batch, team except Exception as e: GLOBAL_LOG.write(f"获得集体汇总默认的课目、班级、批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 设置集体汇总默认的课目、班级、批次 def set_team_projects_batch_class(self, projects, batch, _class): try: self.summary_info['default_team_project'] = projects self.summary_info['default_team_batch'] = batch self.summary_info['default_team_class'] = _class self._update_summary_info() return True except Exception as e: GLOBAL_LOG.write(f"设置集体汇总默认的课目、班级、批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) return False # 集体比例分布图 def scale_drawing(self, projects, _class, batch): try: project = None for k, v in projects.items(): project = k team = _class group_data = self.manager.get_group_multiple_statistic( _type=project, class_list=team, batch=batch ) response_data = [ { "name": grade, "value": len(group_data[grade][DATA]), "children": [ { "id": data[ID], "name": data[NAME], "score": data[SCORE], "count": data[RECORD] if data[RECORD] else "无", "cn_name": GRADE_LIST[grade], "value": 1 } for data in group_data[grade][DATA] ] } for grade in GRADE_LIST ] return response_data except Exception as e: GLOBAL_LOG.write(f"集体比例分布图发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) ## 汇总分析个人界面 # 获得个人汇总默认的课目、人员、批次 def get_personal_projects_batch_person(self): try: projects = [] for k, v in self.summary_info['default_personal_projects'].items(): projects.append(v) batch = self.summary_info['default_personal_batch'] personal = self.summary_info['default_personal_personal'] return projects, batch, personal except Exception as e: GLOBAL_LOG.write(f"获得个人汇总默认的课目、人员、批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 设置个人汇总默认的课目、人员、批次 def set_personal_projects_batch_person(self, projects, batch, person): try: self.summary_info['default_personal_projects'] = projects self.summary_info['default_personal_batch'] = batch self.summary_info['default_personal_personal'] = person self._update_summary_info() return True except Exception as e: GLOBAL_LOG.write(f"设置个人汇总默认的课目、人员、批次发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) return False # 雷达图 def radar_chart(self, person_id, projects, batch): try: projects_list = [] for k, v in projects.items(): projects_list.append(k) data = self.manager.get_individual_statistic( _id=str(person_id), _type_list=projects_list, batch_list=batch ) response_data = data['type_avg'] return response_data except Exception as e: GLOBAL_LOG.write(f"雷达图发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 折线图 def line_chart(self,person_id, projects, batch): try: projects_list = [] categorize = [] valueDdata = {} for k, v in projects.items(): projects_list.append(k) valueDdata.update({k: []}) categorize.append({'name': v, 'keyname': k}) data = self.manager.get_individual_statistic( _id=str(person_id), _type_list=projects_list, batch_list=batch ) batch_data = data['batch'] batch_avg = data['batch_avg'] maxScode = 0 minScore = 0 average = [] for i in batch_data.values(): for k, v in valueDdata.items(): valueDdata[k].append(i[k]['score']) if i[k]['score'] > maxScode: maxScode = i[k]['score'] remainder = maxScode % 10 maxScode = maxScode + (10 - remainder) if maxScode < 100: maxScode = 100 interval = maxScode / 5 for k, v in batch_avg.items(): average.append(v) response_data = { 'minScore': minScore, 'maxScore': maxScode, 'interval': interval, 'average': average, 'barData': { 'categorize': categorize, 'valueDdata': valueDdata } } return response_data except Exception as e: GLOBAL_LOG.write(f"折线图发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True) # 更新汇总信息json def _update_summary_info(self): try: with open(SUMMARY_FILE, "w", encoding='utf-8_sig') as f: json.dump(self.summary_info, f, ensure_ascii=False) except Exception as e: GLOBAL_LOG.write(f"更新汇总信息json发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True)