LISHUZUOXUN_yangjiang/Database/database_pyqt.py

1255 lines
44 KiB
Python
Raw Normal View History

2024-09-23 14:54:15 +08:00
import os
import sqlite3
import time
import traceback
import pandas as pd
from AcrossPlatform.get_platform import GLOBAL_DIR
from SQLDataBase.sql_server import SQLServer
MANAGER_DIR = os.path.join(GLOBAL_DIR, "Manager")
MANAGER_DATABASE = "complete_database.db"
# 人员特征
ID = "id"
NAME = "name"
BAND_ID = "band_id"
CLASS = "class"
GENDER = "gender"
AGE = "age"
HEIGHT = "height"
WEIGHT = "weight"
BMI = "bmi"
PBF = "pbf"
PERSON_TYPE = "person_type"
PERSON_FEATURE_LIST = [
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE
]
PERSON_TABLE_COLUMNS = ["人员编号", "姓名", "手环", "所在班", "性别", "年龄", "身高", "体重", "BMI", "PBF", "人员类型"]
# 成绩特征
BATCH = "batch"
SCORE_TYPE = "score_type"
RECORD = "record"
SCORE = "score"
COUNT = 'count'
GRADE = "grade"
# 课目列表
PUSHUP = "pushup"
SITUP = "situp"
OVERHANG = "overhang"
RUNAROUND = "runaround"
RUNNING = "running"
PULLUP = "pullup"
EXERCISE_NAME_LIST = {
PUSHUP: "俯卧撑",
SITUP: "仰卧起坐",
OVERHANG: "曲臂悬垂",
RUNAROUND: "30*2蛇形跑",
RUNNING: "长跑",
PULLUP: "引体向上"
}
GRADE_GREAT = "GREAT"
GRADE_GOOD = "GOOD"
GRADE_PASS = "PASS"
GRADE_FAIL = "FAIL"
GRADE_MISS = "MISS"
GRADE_LIST = {
GRADE_GREAT: "优秀",
GRADE_GOOD: "良好",
GRADE_PASS: "及格",
GRADE_FAIL: "不及格",
GRADE_MISS: "缺考"
}
GRADE_RANK = [
GRADE_GREAT,
GRADE_GOOD,
GRADE_PASS,
GRADE_FAIL,
GRADE_MISS
]
# 统计特征
DATA = "data"
PERCENTAGE = "percentage"
BATCH_AVG = "batch_avg"
TYPE_AVG = "type_avg"
def get_grade(score):
if score >= 90:
return GRADE_GREAT
elif 70 <= score < 90:
return GRADE_GOOD
elif 60 <= score < 70:
return GRADE_PASS
elif 0 <= score < 60:
return GRADE_FAIL
else:
return GRADE_MISS
# 完整成绩特征
SCORE_FEATURE_LIST = [
ID, NAME, CLASS, BATCH, SCORE_TYPE, RECORD, SCORE
]
SCORE_TABLE_COLUMNS = [
"人员编号", "姓名", "所在班", "批次", "课目", "记录", "成绩"
]
# 成绩特征
PULLUP_COUNT = "pullup_count"
PULLUP_SCORE = "pullup_score"
HANGING_COUNT = "hanging_count"
HANGING_SCORE = "hanging_score"
PUSHUP_COUNT = "pushup_count"
PUSHUP_SCORE = "pushup_score"
SITUP_COUNT = "situp_count"
SITUP_SCORE = "situp_score"
RUNNING_COUNT = "running_count"
RUNNING_SCORE = "running_score"
RUN_BF_COUNT = "run_bf_count"
RUN_BF_SCORE = "run_bf_score"
FINAL_SCORE = "final_score"
SCORE_TYPE_FEATURE_LIST = [
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE, PUSHUP_SCORE, PUSHUP_COUNT,
SITUP_SCORE, SITUP_COUNT, HANGING_SCORE, HANGING_COUNT, RUN_BF_SCORE, RUN_BF_COUNT, RUNNING_SCORE, RUNNING_COUNT,
PULLUP_SCORE, PULLUP_COUNT, FINAL_SCORE
]
SCORE_TYPE_TABLE_COLUMNS = [
"人员编号", "姓名", "手环编号", "所在班", "性别", "年龄", "身高", "体重", "心率", "血氧", "人员类型", "俯卧撑成绩",
"俯卧撑个数", "仰卧起坐成绩", "仰卧起坐个数", "曲臂悬垂成绩", "曲臂悬垂时间(秒)", "蛇形跑成绩", "蛇形跑时间(秒)",
"长跑成绩", "长跑时间(秒)", "引体向上成绩", "引体向上个数", "最终成绩"
]
# 数据库状态
ID_ERROR = 1
UNKNOWN_ERROR = -1
OK = 0
NO_OBJ = 2
class Database:
def __init__(self) -> None:
super().__init__()
self.database = SQLServer(server_dir=MANAGER_DIR, server_name=MANAGER_DATABASE)
self._create_person_table()
self._create_score_table()
self._create_batch_table()
# 创建人员索引表
def _create_person_table(self):
sql = "CREATE TABLE IF NOT EXISTS PERSON (" \
"id CHAR(100) PRIMARY KEY," \
"name CHAR(100)," \
"band_id TEXT," \
"class TEXT," \
"gender CHAR(20)," \
"age INT," \
"height INT," \
"weight INT," \
"bmi FLOAT," \
"pbf FLOAT," \
"person_type INT" \
");"
self.database.execute(sql)
def _create_score_table(self):
sql = "CREATE TABLE IF NOT EXISTS SCORE (" \
"id CHAR(100)," \
"name CHAR(100)," \
"class TEXT," \
"batch TEXT," \
"score_type TEXT," \
"record TEXT," \
"score FLOAT" \
");"
self.database.execute(sql)
def _create_batch_table(self):
sql = "CREATE TABLE IF NOT EXISTS BATCH_INFO (" \
"batch TEXT," \
"start_time INT" \
");"
self.database.execute(sql)
# 获得当前批次
def get_this_batch(self):
sql = "SELECT batch FROM BATCH_INFO " \
"ORDER BY start_time DESC " \
"LIMIT 1"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
if result:
return result[0][ID]
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
# 新增一个批次
def generate_batch(self):
this_time = int(time.time())
date = time.strftime("%Y年%m月%d%H时", time.localtime(this_time))
sql_check = "SELECT batch, start_time FROM BATCH_INFO WHERE batch = ?;"
sql_insert = "INSERT INTO BATCH_INFO (batch, start_time) VALUES (?, ?);"
try:
data = self.database.execute(sql_check, (date,))
if not data:
self.database.execute(sql_insert, (date, this_time))
self.database.commit()
return date
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_score(self):
sql = f"SELECT {', '.join(SCORE_FEATURE_LIST)} FROM SCORE"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_batch(self):
sql = f"SELECT {BATCH} FROM BATCH_INFO GROUP BY {BATCH} ORDER BY start_time"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_id(self):
sql = f"SELECT {ID} FROM PERSON GROUP BY {ID}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_name(self):
sql = f"SELECT {ID}, {NAME} FROM PERSON GROUP BY {NAME} ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_id_class(self):
sql = f"SELECT {ID}, {CLASS} FROM PERSON ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_class(self):
sql = f"SELECT {CLASS} FROM PERSON GROUP BY {CLASS}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
@staticmethod
def get_all_exercise_types():
return list(EXERCISE_NAME_LIST.keys())
# 获得各课目训练汇总
def get_exercise_statistic(self, type_list, batch):
# 按照项目、成绩统计
grade_table = {
_type: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _type in type_list
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] in type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
_type: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None
}
for _type in type_list
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[SCORE_TYPE]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[SCORE_TYPE]][SCORE] = data[SCORE]
person_group[data[ID]][data[SCORE_TYPE]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
for _type, type_values in id_values.items():
grade = type_values[GRADE]
score = type_values[SCORE]
record = type_values[RECORD]
grade_table[_type][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _type, type_values in grade_table.items():
for grade, grade_values in type_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / len(all_id)
except Exception as e:
print(e.args, traceback.format_exc())
return grade_table
# 各班级科目汇总
def get_class_statistic(self, batch, _type):
all_class = self.get_all_class()
# 按照班级进行统计
class_table = {
_class: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _class in all_class
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
all_id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
CLASS: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
person_group[data[ID]][CLASS] = all_id_class[data[ID]]
for _id, id_values in person_group.items():
_class = id_values[CLASS]
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
class_table[_class][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _class, class_values in class_table.items():
for grade, grade_values in class_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(class_data[DATA])
for class_data in class_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return class_table
# 集体科目训练汇总
def get_group_statistic(self, _type):
all_batch = self.get_all_batch()
# 按照批次进行统计
batch_table = {
batch: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for batch in all_batch
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
batch: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
}
for batch in all_batch
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[BATCH]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[BATCH]][SCORE] = data[SCORE]
person_group[data[ID]][data[BATCH]][RECORD] = data[RECORD]
# print(data, get_grade(data[SCORE]))
for _id, id_values in person_group.items():
for batch, batch_values in id_values.items():
grade = batch_values[GRADE]
score = batch_values[SCORE]
record = batch_values[RECORD]
batch_table[batch][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for batch, batch_values in batch_table.items():
for grade, grade_values in batch_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(batch_data[DATA])
for batch_data in batch_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return batch_table
# 集体统计
def get_group_multiple_statistic(self, _type, class_list, batch):
# 按照批次进行统计
table = {
grade: {
DATA: []
}
for grade in GRADE_LIST
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[CLASS] in class_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
if id_class[_id] not in class_list:
continue
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
table[grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
except Exception as e:
print(e.args, traceback.format_exc())
# 最终统计
return table
# 个人统计
def get_individual_statistic(self, _id, _type_list, batch_list):
# 初始化输出表
group = {
BATCH: {},
BATCH_AVG: {batch: 0 for batch in batch_list},
TYPE_AVG: {_type: 0 for _type in _type_list}
}
for batch in batch_list:
group[BATCH].setdefault(batch, {
_type: {
SCORE: 0
}
for _type in _type_list
})
try:
full_data = self.get_all_score()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[ID] == _id, selected_data))
selected_data = list(filter(lambda x: x[SCORE_TYPE] in _type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch_list, selected_data))
# 计算成绩
for data in selected_data:
group[BATCH][data[BATCH]][data[SCORE_TYPE]][SCORE] = data[SCORE]
# 按照类型、批次进行统计
for batch, batch_values in group[BATCH].items():
batch_avg = sum([score[SCORE] for score in batch_values.values()]) / len(_type_list)
group[BATCH_AVG][batch] = batch_avg
for _type, type_values in batch_values.items():
group[TYPE_AVG][_type] = max(type_values[SCORE], group[TYPE_AVG][_type])
except Exception as e:
print(e.args, traceback.format_exc())
return group
# 插入信息
def insert_a_info(self, data):
format_data = [data.get(column) for column in PERSON_FEATURE_LIST]
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
self.database.execute(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 从Excel导入一个数据
def insert_many_person_from_xlsx(self, xlsx_file_name):
df = pd.read_excel(xlsx_file_name)
df = df[PERSON_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT OR IGNORE INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
self.database.executemany(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
print(traceback.format_exc())
return ID_ERROR
# 从json导入一个数据
def insert_many_person_from_json(self, data):
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
format_data = [[row[pfl] for pfl in PERSON_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_person(self, _id):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM PERSON WHERE id = ?"
sql1 = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.execute(sql1, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员信息
def delete_all_person(self):
sql = "DELETE FROM PERSON"
sql1 = "DELETE FROM SCORE"
try:
self.database.execute(sql)
self.database.execute(sql1)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得某个人员的信息
def select_a_person(self, _id):
sql = "SELECT {} FROM PERSON WHERE id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获得某个人员的信息
def select_a_person_from_band(self, band_id):
sql = "SELECT {} FROM PERSON WHERE band_id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (band_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获取一个班级的所有人员信息
def get_a_class(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 获得所有人员的信息
def select_all_person(self):
sql = "SELECT {} FROM PERSON;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql)
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 获得一个班的人员的信息
def select_class_person(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 修改指定人员的信息
def update_a_person(self, data):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (data[ID],))
if person:
format_data = [data.get(column) for column in PERSON_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE PERSON SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in PERSON_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
format_data = [data.get(column) for column in SCORE_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE SCORE SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in SCORE_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
self.database.commit()
return OK
else:
return NO_OBJ
# 搜索一个名字
def select_a_person_from_name(self, name):
sql = "SELECT {} FROM PERSON WHERE NAME LIKE '%{}%';".format(", ".join(PERSON_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 搜索一个名字
def select_a_score_from_name(self, name):
sql = "SELECT {} FROM SCORE WHERE NAME LIKE '%{}%';".format(", ".join(SCORE_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
result.append(r)
return result
# 导出所有人员信息
def dump_person(self, save_dir):
data = self.select_all_person()
df = pd.DataFrame(data)
df.columns = [col for col in PERSON_FEATURE_LIST if col in df.columns]
df.to_excel(save_dir, index=False, header=PERSON_TABLE_COLUMNS)
# df.to_excel(save_dir, index=False)
# 从Excel导入成绩
def insert_many_score_from_xlsx(self, xlsx_file_dir):
df = pd.read_excel(xlsx_file_dir)
df = df[SCORE_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
self.database.executemany(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 从json导入成绩
def insert_many_score_from_json(self, data):
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
format_data = [[row[sfl] for sfl in SCORE_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_score(self, _id):
sql = "SELECT * FROM SCORE WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员成绩
def clear_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 删除所有人员成绩
def delete_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 查询成绩是否存在
def select_a_score(self, person_id, score_type, batch=None):
if batch is None:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch IS NULL AND score_type = ?;"
data = (person_id, score_type)
else:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch = ? AND score_type = ?;"
data = (person_id, batch, score_type)
try:
result = self.database.execute(sql, data)
score_data = []
for row in result:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
return score_data
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 插入一条成绩
def insert_a_score(self, person_id, name, person_class, score_type, record, score, batch=None):
if batch is None:
sql = "INSERT INTO SCORE (id, name, class, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, score_type, record, score)
else:
sql = "INSERT INTO SCORE (id, name, class, batch, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, batch, score_type, record, score)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计时类项目成绩
def update_timekeeping_score(self, record, score, person_id, score_type, batch=None):
if record is not None and record != -1:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计数类项目成绩
def update_counting_score(self, record, score, person_id, score_type, batch=None):
if record or record == 0:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得所有人员的成绩
def select_all_score(self, batch=None):
if batch:
sql = "SELECT {} FROM SCORE WHERE batch = ?;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, (batch,))
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
person_data = self.select_all_person()
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
# 获得score表
def select_score(self):
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
return score_data
# 获得所有人员的成绩
def get_a_class_score(self, batch=None, _class=None, name=None):
conditions = []
params = []
if batch is not None:
conditions.append("batch = ?")
params.append(batch)
if _class is not None:
conditions.append("class = ?")
params.append(_class)
if name is not None:
conditions.append("NAME LIKE ?")
params.append(f'%{name}%')
if conditions:
sql = "SELECT {} FROM SCORE WHERE {};".format(
", ".join(SCORE_FEATURE_LIST),
" AND ".join(conditions)
)
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, tuple(params))
person_data = self.select_class_person(_class=_class, name=name)
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
# 导出所有人员成绩
def dump_score(self, save_dir):
data = self.select_all_score()
df = pd.DataFrame(data)
print(df.columns)
df.columns = [col for col in SCORE_TYPE_FEATURE_LIST if col in df.columns]
df.to_excel(save_dir, index=False, header=SCORE_TYPE_TABLE_COLUMNS)
def format_time(self, seconds):
"""将秒数转换为 HH:mm:ss 格式"""
_seconds = float(seconds)
m, s = divmod(abs(_seconds), 60)
h, m = divmod(m, 60)
raw = f"{int(h):02d}:{int(m):02d}:{int(s):02d}"
if _seconds < 0:
raw = "-" + raw
return raw
def select_person_manager_pyqt(self,_class, name):
conditions = []
params = []
if _class is not None:
conditions.append("class = ?")
params.append(_class)
else:
conditions.append("1 = 1")
if name is not None:
conditions.append("name LIKE ?")
params.append(f'%{name}%')
sql = "SELECT {} FROM PERSON WHERE {};".format(
", ".join(PERSON_FEATURE_LIST),
" AND ".join(conditions)
)
data = self.database.execute(sql, tuple(params))
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
def select_score_manager_pyqt(self, _class=None, batch=None, name=None):
conditions = []
params = []
if batch is not None:
conditions.append("batch = ?")
params.append(batch)
if _class is not None:
conditions.append("class = ?")
params.append(_class)
if name is not None:
conditions.append("NAME LIKE ?")
params.append(f'%{name}%')
if conditions:
sql = "SELECT {} FROM SCORE WHERE {};".format(
", ".join(SCORE_FEATURE_LIST),
" AND ".join(conditions)
)
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, tuple(params))
person_data = self.select_person(_class=_class, name=name)
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
def select_person(self, _class=None, name=None):
conditions = []
params = []
if _class is not None:
conditions.append("class = ?")
params.append(_class)
else:
# 注意这里的条件应避免SQL语法错误始终为真的条件应该是 "1=1"
conditions.append("1 = 1")
if name is not None:
conditions.append("name LIKE ?")
params.append(f'%{name}%')
sql = "SELECT {} FROM PERSON WHERE {};".format(
", ".join(PERSON_FEATURE_LIST),
" AND ".join(conditions)
)
# 执行 SQL 语句
data = self.database.execute(sql, tuple(params))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result