LISHUZUOXUN_yangjiang/PureBackend/data_manager.py

365 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 代码编写人:曾忠和
# coding: gb2312
from Database.database_pyqt import *
from PureBackend.base_driver import MODEL_MEDIAPIPE
from PureBackend.exam_driver import ExamDriver
from PureBackend.general import *
from LogRecord.log_recorder import GLOBAL_LOG
from typing import List
def format_time(seconds):
"""将秒数转换为 HH:mm:ss 格式"""
seconds = float(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{int(h):02d}:{int(m):02d}:{int(s):02d}"
class DataManager(ExamDriver):
def __init__(self, master_mode=True, positioning=True, camera=True, model=MODEL_MEDIAPIPE, speaker=True,
multi_positioning_mode=True, device_type=UNKNOWN, pure_mode=False):
super().__init__(master_mode, positioning, camera, model, speaker, multi_positioning_mode, device_type,
pure_mode)
# 录入作训成绩
def update_score(self):
try:
person_id = self.project.info[ID]
name = self.project.info[NAME]
person_class = self.project.info['class']
result = self.project.get_result()
score = result["score"]
had_done = result['had_done']
if score == '不合格':
real_score = 0
else:
real_score = float(score)
count = result["count"]
self.synchronization_info[self.exercise_tag] = {}
self.send_score_signal.set()
self._statistics_sql_score = {
self.exercise_tag: {person_id: {'record': count, 'score': real_score}}}
if had_done:
self.send_sql_score_signal.set()
_count = count
_score = real_score
else:
_count = -1
_score = 0
_result = self.manager.select_a_score(person_id=person_id, score_type=self.exercise_tag)
if _result is None:
self.manager.insert_a_score(record=count, score=real_score, person_id=person_id,
score_type=self.exercise_tag,
person_class=person_class, name=name)
else:
_record = float(_result[0]['record'])
_score = float(_result[0]['score'])
if self.exercise_tag in {PUSHUP, PULLUP, SITUP, OVERHANG}:
if float(count) > _record or real_score > _score:
self.manager.update_counting_score(record=count, score=real_score,
person_id=person_id,
score_type=self.exercise_tag)
elif self.exercise_tag == RUNAROUND:
if count != -1 and (float(count) < _record or real_score > _score):
self.manager.update_timekeeping_score(record=_count, score=real_score,
person_id=person_id,
score_type=self.exercise_tag,
)
return True
except Exception as e:
GLOBAL_LOG.write(f"录入作训成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 确认长跑成绩有效
def update_running_score(self):
try:
final_score = self.running.get_valid_score()
self.synchronization_info[RUNNING] = []
self.send_score_signal.set()
self._statistics_sql_score = {RUNNING: {}}
for score in final_score:
result = self.manager.select_a_score(person_id=score[ID], score_type=RUNNING)
if result is None:
self.manager.insert_a_score(record=score["total_time"], score=score['score'],
person_id=score[ID],
score_type=RUNNING,
person_class=score['class'], name=score[NAME])
else:
_record = float(result[0]['record'])
_score = float(result[0]['score'])
if float(score["total_time"]) < _record or float(score['score']) > _score:
self.manager.update_timekeeping_score(record=score["total_time"], score=score['score'],
person_id=score[ID], score_type=RUNNING,
)
self._statistics_sql_score[RUNNING][score[ID]] = {'record': score["total_time"],
'score': score['score']}
if final_score:
self.send_sql_score_signal.set()
return True
except Exception as e:
GLOBAL_LOG.write(f"确认长跑成绩有效发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 增加一个人员
def add_person(self, data):
try:
data = {key: value if value != '' else None for key, value in data.items()}
if all(data[key] is not None for key in ['name', 'id', 'class', 'gender', 'age']):
response_code = self.manager.insert_a_info(data)
else:
response_code = NO_OBJ
self.speak_driver.add_speak("缺少关键信息!")
if response_code == OK:
self.speak_driver.add_speak("成功添加人员!")
return True
else:
self.speak_driver.add_speak("新增人员失败!")
return False
except Exception as e:
GLOBAL_LOG.write(f"增加一个人员发生错误,{str(e)},错误来源:{traceback.format_exc()}", need_print=True)
return False
# 导入excel
def load_xlsx(self, file):
try:
# if not os.path.exists(CACHE_DIR):
# os.makedirs(CACHE_DIR)
# save_file_dir = CACHE_DIR + "person.xlsx"
# file.save(CACHE_DIR + "person.xlsx")
response_code = self.manager.insert_many_person_from_xlsx(file)
if response_code == OK:
self.speak_driver.add_speak("成功导入人员信息!")
return True
else:
self.speak_driver.add_speak("导入人员信息失败!")
return False
except Exception as e:
GLOBAL_LOG.write(f"导入excel发生错误{str(e)},错误来源:{traceback.format_exc()}", need_print=True)
return False
# 更新某个人的数据
def update_person(self, data):
try:
data = {key: value if value != '' else None for key, value in data.items()}
if all(data[key] is not None for key in ['name', 'id', 'class', 'gender', 'age']):
response_code = self.manager.update_a_person(data)
else:
response_code = NO_OBJ
self.speak_driver.add_speak("缺少关键信息!")
if response_code == OK:
self.speak_driver.add_speak("成功修改人员信息!")
return True
else:
self.speak_driver.add_speak("修改人员信息失败!")
return False
except Exception as e:
GLOBAL_LOG.write(f"更新某个人的数据发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 删除某个人的数据
def delete_person(self, _id_list):
try:
person_to_be_del = _id_list
if isinstance(person_to_be_del, List):
for person in person_to_be_del:
response_code = self.manager.delete_a_person(person)
if response_code == OK:
self.manager.delete_a_score(person)
else:
response_code = self.manager.delete_a_person(_id_list)
if response_code == OK:
self.manager.delete_a_score(_id_list)
self.speak_driver.add_speak("成功删除人员!")
return True
except Exception as e:
GLOBAL_LOG.write(f"删除某个人的数据发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 删除所有人员信息
def delete_all_person(self):
try:
response_code = self.manager.delete_all_person()
if response_code == OK:
return True
else:
return False
except Exception as e:
GLOBAL_LOG.write(f"删除所有人员信息发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 导入成绩excel
def load_score_xlsx(self, file):
try:
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
save_file_dir = CACHE_DIR + "score.xlsx"
file.save(CACHE_DIR + "score.xlsx")
response_code = self.manager.insert_many_score_from_xlsx(save_file_dir)
if response_code == OK:
return True
else:
return False
except Exception as e:
GLOBAL_LOG.write(f"删除所有人员信息发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 搜索人员成绩信息
def get_a_name_score(self, name, batch=None):
try:
response_data = self.manager.select_a_score_from_name(name=name, batch=batch)
return response_data
except Exception as e:
GLOBAL_LOG.write(f"搜索人员成绩信息发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 搜索人员基本信息
def get_a_name_person(self, name):
try:
response_data = self.manager.select_a_person_from_name(name)
return response_data
except Exception as e:
GLOBAL_LOG.write(f"搜索人员基本信息发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 删除所有人员成绩
def delete_all_score(self):
try:
response_code = self.manager.clear_all_score()
if response_code == OK:
self.speak_driver.add_speak("成功删除成绩信息!")
return True
else:
self.speak_driver.add_speak("删除成绩信息失败!")
return False
except Exception as e:
GLOBAL_LOG.write(f"删除所有人员成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 获得指定班级的数据
def get_person(self, _class, name):
try:
response_data = self.manager.select_person_manager_pyqt(_class=_class, name=name)
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得指定班级的数据发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 获得所有班级列表
def get_all_class(self):
try:
response_data = self.manager.get_all_class()
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得所有班级列表发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return False
# 获得所有人员信息
def get_all_person(self):
try:
response_data = self.manager.select_all_person()
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得所有人员信息发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 获得指定班级成绩
def get_score(self, _class=None, batch=None, name=None):
try:
response_data = self.manager.select_score_manager_pyqt(_class=_class, batch=batch, name=name)
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得指定班级成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 获得所有人员信息成绩
def get_all_score(self, batch):
try:
response_data = self.manager.select_all_score(batch=batch)
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得所有人员信息成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
def get_band_mes(self, band_id):
try:
if band_id:
raw_info = self.manager.select_a_person_from_band(band_id)
if not raw_info:
GLOBAL_LOG.write(f"准备进行曲臂悬垂发生错误: 该手环没有匹配的人员", need_print=True)
return None
info = raw_info[0]
return info
else:
return None
except Exception as e:
GLOBAL_LOG.write(f"get_band_mes发生错误{str(e)},错误来源:{traceback.format_exc()}")
return None
def select_exercise_score(self, exercise_type):
try:
all_data = self.get_all_score()
if exercise_type == 'runaround':
exercise_score = 'run_bf_score'
exercise_count = 'run_bf_count'
elif exercise_type == 'running':
response_data = [
{
'is_choose': False,
NAME: score["name"],
ID: score["id"],
SCORE: int(score['running_score']),
COUNT: format_time(score['running_count']),
CLASS: score['class'],
BAND_ID: score["band_id"]
}
for score in all_data
]
return response_data
elif '_' in exercise_type:
_exercise_type = exercise_type.replace('_', '')
exercise_score = _exercise_type + '_score'
exercise_count = _exercise_type + '_count'
else:
exercise_score = exercise_type + '_score'
exercise_count = exercise_type + '_count'
response_data = [
{
'is_choose': False,
NAME: score["name"],
ID: score["id"],
SCORE: score[exercise_score],
COUNT: score[exercise_count],
CLASS: score['class'],
BAND_ID: score["band_id"]
}
for score in all_data
]
return response_data
except Exception as e:
GLOBAL_LOG.write(f"获得所有人员信息成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
def get_person_xlsx(self, folder):
try:
self.manager.dump_person(folder + '/' + "person.xlsx")
self.speak_driver.add_speak("成功导出人员信息!")
return True
except Exception as e:
GLOBAL_LOG.write(f"获得所有人员信息成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None
# 导出所有人员信息成绩
def get_score_xlsx(self, folder):
try:
self.manager.dump_score(folder + '/' + "score.xlsx")
self.speak_driver.add_speak("成功导出成绩信息!")
return True
except Exception as e:
GLOBAL_LOG.write(f"获得所有人员信息成绩发生错误,{str(e)},错误来源:{traceback.format_exc()}")
return None