LISHUZUOXUN_yangjiang/Database/database_pyqt.py

1255 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sqlite3
import time
import traceback
import pandas as pd
from AcrossPlatform.get_platform import GLOBAL_DIR
from SQLDataBase.sql_server import SQLServer
MANAGER_DIR = os.path.join(GLOBAL_DIR, "Manager")
MANAGER_DATABASE = "complete_database.db"
# 人员特征
ID = "id"
NAME = "name"
BAND_ID = "band_id"
CLASS = "class"
GENDER = "gender"
AGE = "age"
HEIGHT = "height"
WEIGHT = "weight"
BMI = "bmi"
PBF = "pbf"
PERSON_TYPE = "person_type"
PERSON_FEATURE_LIST = [
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE
]
PERSON_TABLE_COLUMNS = ["人员编号", "姓名", "手环", "所在班", "性别", "年龄", "身高", "体重", "BMI", "PBF", "人员类型"]
# 成绩特征
BATCH = "batch"
SCORE_TYPE = "score_type"
RECORD = "record"
SCORE = "score"
COUNT = 'count'
GRADE = "grade"
# 课目列表
PUSHUP = "pushup"
SITUP = "situp"
OVERHANG = "overhang"
RUNAROUND = "runaround"
RUNNING = "running"
PULLUP = "pullup"
EXERCISE_NAME_LIST = {
PUSHUP: "俯卧撑",
SITUP: "仰卧起坐",
OVERHANG: "曲臂悬垂",
RUNAROUND: "30*2蛇形跑",
RUNNING: "长跑",
PULLUP: "引体向上"
}
GRADE_GREAT = "GREAT"
GRADE_GOOD = "GOOD"
GRADE_PASS = "PASS"
GRADE_FAIL = "FAIL"
GRADE_MISS = "MISS"
GRADE_LIST = {
GRADE_GREAT: "优秀",
GRADE_GOOD: "良好",
GRADE_PASS: "及格",
GRADE_FAIL: "不及格",
GRADE_MISS: "缺考"
}
GRADE_RANK = [
GRADE_GREAT,
GRADE_GOOD,
GRADE_PASS,
GRADE_FAIL,
GRADE_MISS
]
# 统计特征
DATA = "data"
PERCENTAGE = "percentage"
BATCH_AVG = "batch_avg"
TYPE_AVG = "type_avg"
def get_grade(score):
if score >= 90:
return GRADE_GREAT
elif 70 <= score < 90:
return GRADE_GOOD
elif 60 <= score < 70:
return GRADE_PASS
elif 0 <= score < 60:
return GRADE_FAIL
else:
return GRADE_MISS
# 完整成绩特征
SCORE_FEATURE_LIST = [
ID, NAME, CLASS, BATCH, SCORE_TYPE, RECORD, SCORE
]
SCORE_TABLE_COLUMNS = [
"人员编号", "姓名", "所在班", "批次", "课目", "记录", "成绩"
]
# 成绩特征
PULLUP_COUNT = "pullup_count"
PULLUP_SCORE = "pullup_score"
HANGING_COUNT = "hanging_count"
HANGING_SCORE = "hanging_score"
PUSHUP_COUNT = "pushup_count"
PUSHUP_SCORE = "pushup_score"
SITUP_COUNT = "situp_count"
SITUP_SCORE = "situp_score"
RUNNING_COUNT = "running_count"
RUNNING_SCORE = "running_score"
RUN_BF_COUNT = "run_bf_count"
RUN_BF_SCORE = "run_bf_score"
FINAL_SCORE = "final_score"
SCORE_TYPE_FEATURE_LIST = [
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE, PUSHUP_SCORE, PUSHUP_COUNT,
SITUP_SCORE, SITUP_COUNT, HANGING_SCORE, HANGING_COUNT, RUN_BF_SCORE, RUN_BF_COUNT, RUNNING_SCORE, RUNNING_COUNT,
PULLUP_SCORE, PULLUP_COUNT, FINAL_SCORE
]
SCORE_TYPE_TABLE_COLUMNS = [
"人员编号", "姓名", "手环编号", "所在班", "性别", "年龄", "身高", "体重", "心率", "血氧", "人员类型", "俯卧撑成绩",
"俯卧撑个数", "仰卧起坐成绩", "仰卧起坐个数", "曲臂悬垂成绩", "曲臂悬垂时间(秒)", "蛇形跑成绩", "蛇形跑时间(秒)",
"长跑成绩", "长跑时间(秒)", "引体向上成绩", "引体向上个数", "最终成绩"
]
# 数据库状态
ID_ERROR = 1
UNKNOWN_ERROR = -1
OK = 0
NO_OBJ = 2
class Database:
def __init__(self) -> None:
super().__init__()
self.database = SQLServer(server_dir=MANAGER_DIR, server_name=MANAGER_DATABASE)
self._create_person_table()
self._create_score_table()
self._create_batch_table()
# 创建人员索引表
def _create_person_table(self):
sql = "CREATE TABLE IF NOT EXISTS PERSON (" \
"id CHAR(100) PRIMARY KEY," \
"name CHAR(100)," \
"band_id TEXT," \
"class TEXT," \
"gender CHAR(20)," \
"age INT," \
"height INT," \
"weight INT," \
"bmi FLOAT," \
"pbf FLOAT," \
"person_type INT" \
");"
self.database.execute(sql)
def _create_score_table(self):
sql = "CREATE TABLE IF NOT EXISTS SCORE (" \
"id CHAR(100)," \
"name CHAR(100)," \
"class TEXT," \
"batch TEXT," \
"score_type TEXT," \
"record TEXT," \
"score FLOAT" \
");"
self.database.execute(sql)
def _create_batch_table(self):
sql = "CREATE TABLE IF NOT EXISTS BATCH_INFO (" \
"batch TEXT," \
"start_time INT" \
");"
self.database.execute(sql)
# 获得当前批次
def get_this_batch(self):
sql = "SELECT batch FROM BATCH_INFO " \
"ORDER BY start_time DESC " \
"LIMIT 1"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
if result:
return result[0][ID]
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
# 新增一个批次
def generate_batch(self):
this_time = int(time.time())
date = time.strftime("%Y年%m月%d%H时", time.localtime(this_time))
sql_check = "SELECT batch, start_time FROM BATCH_INFO WHERE batch = ?;"
sql_insert = "INSERT INTO BATCH_INFO (batch, start_time) VALUES (?, ?);"
try:
data = self.database.execute(sql_check, (date,))
if not data:
self.database.execute(sql_insert, (date, this_time))
self.database.commit()
return date
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_score(self):
sql = f"SELECT {', '.join(SCORE_FEATURE_LIST)} FROM SCORE"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_batch(self):
sql = f"SELECT {BATCH} FROM BATCH_INFO GROUP BY {BATCH} ORDER BY start_time"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_id(self):
sql = f"SELECT {ID} FROM PERSON GROUP BY {ID}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_name(self):
sql = f"SELECT {ID}, {NAME} FROM PERSON GROUP BY {NAME} ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_id_class(self):
sql = f"SELECT {ID}, {CLASS} FROM PERSON ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_class(self):
sql = f"SELECT {CLASS} FROM PERSON GROUP BY {CLASS}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
@staticmethod
def get_all_exercise_types():
return list(EXERCISE_NAME_LIST.keys())
# 获得各课目训练汇总
def get_exercise_statistic(self, type_list, batch):
# 按照项目、成绩统计
grade_table = {
_type: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _type in type_list
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] in type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
_type: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None
}
for _type in type_list
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[SCORE_TYPE]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[SCORE_TYPE]][SCORE] = data[SCORE]
person_group[data[ID]][data[SCORE_TYPE]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
for _type, type_values in id_values.items():
grade = type_values[GRADE]
score = type_values[SCORE]
record = type_values[RECORD]
grade_table[_type][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _type, type_values in grade_table.items():
for grade, grade_values in type_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / len(all_id)
except Exception as e:
print(e.args, traceback.format_exc())
return grade_table
# 各班级科目汇总
def get_class_statistic(self, batch, _type):
all_class = self.get_all_class()
# 按照班级进行统计
class_table = {
_class: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _class in all_class
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
all_id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
CLASS: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
person_group[data[ID]][CLASS] = all_id_class[data[ID]]
for _id, id_values in person_group.items():
_class = id_values[CLASS]
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
class_table[_class][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _class, class_values in class_table.items():
for grade, grade_values in class_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(class_data[DATA])
for class_data in class_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return class_table
# 集体科目训练汇总
def get_group_statistic(self, _type):
all_batch = self.get_all_batch()
# 按照批次进行统计
batch_table = {
batch: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for batch in all_batch
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
batch: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
}
for batch in all_batch
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[BATCH]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[BATCH]][SCORE] = data[SCORE]
person_group[data[ID]][data[BATCH]][RECORD] = data[RECORD]
# print(data, get_grade(data[SCORE]))
for _id, id_values in person_group.items():
for batch, batch_values in id_values.items():
grade = batch_values[GRADE]
score = batch_values[SCORE]
record = batch_values[RECORD]
batch_table[batch][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for batch, batch_values in batch_table.items():
for grade, grade_values in batch_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(batch_data[DATA])
for batch_data in batch_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return batch_table
# 集体统计
def get_group_multiple_statistic(self, _type, class_list, batch):
# 按照批次进行统计
table = {
grade: {
DATA: []
}
for grade in GRADE_LIST
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[CLASS] in class_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
if id_class[_id] not in class_list:
continue
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
table[grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
except Exception as e:
print(e.args, traceback.format_exc())
# 最终统计
return table
# 个人统计
def get_individual_statistic(self, _id, _type_list, batch_list):
# 初始化输出表
group = {
BATCH: {},
BATCH_AVG: {batch: 0 for batch in batch_list},
TYPE_AVG: {_type: 0 for _type in _type_list}
}
for batch in batch_list:
group[BATCH].setdefault(batch, {
_type: {
SCORE: 0
}
for _type in _type_list
})
try:
full_data = self.get_all_score()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[ID] == _id, selected_data))
selected_data = list(filter(lambda x: x[SCORE_TYPE] in _type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch_list, selected_data))
# 计算成绩
for data in selected_data:
group[BATCH][data[BATCH]][data[SCORE_TYPE]][SCORE] = data[SCORE]
# 按照类型、批次进行统计
for batch, batch_values in group[BATCH].items():
batch_avg = sum([score[SCORE] for score in batch_values.values()]) / len(_type_list)
group[BATCH_AVG][batch] = batch_avg
for _type, type_values in batch_values.items():
group[TYPE_AVG][_type] = max(type_values[SCORE], group[TYPE_AVG][_type])
except Exception as e:
print(e.args, traceback.format_exc())
return group
# 插入信息
def insert_a_info(self, data):
format_data = [data.get(column) for column in PERSON_FEATURE_LIST]
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
self.database.execute(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 从Excel导入一个数据
def insert_many_person_from_xlsx(self, xlsx_file_name):
df = pd.read_excel(xlsx_file_name)
df = df[PERSON_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT OR IGNORE INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
self.database.executemany(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
print(traceback.format_exc())
return ID_ERROR
# 从json导入一个数据
def insert_many_person_from_json(self, data):
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
format_data = [[row[pfl] for pfl in PERSON_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_person(self, _id):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM PERSON WHERE id = ?"
sql1 = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.execute(sql1, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员信息
def delete_all_person(self):
sql = "DELETE FROM PERSON"
sql1 = "DELETE FROM SCORE"
try:
self.database.execute(sql)
self.database.execute(sql1)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得某个人员的信息
def select_a_person(self, _id):
sql = "SELECT {} FROM PERSON WHERE id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获得某个人员的信息
def select_a_person_from_band(self, band_id):
sql = "SELECT {} FROM PERSON WHERE band_id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (band_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获取一个班级的所有人员信息
def get_a_class(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 获得所有人员的信息
def select_all_person(self):
sql = "SELECT {} FROM PERSON;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql)
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 获得一个班的人员的信息
def select_class_person(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 修改指定人员的信息
def update_a_person(self, data):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (data[ID],))
if person:
format_data = [data.get(column) for column in PERSON_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE PERSON SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in PERSON_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
format_data = [data.get(column) for column in SCORE_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE SCORE SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in SCORE_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
self.database.commit()
return OK
else:
return NO_OBJ
# 搜索一个名字
def select_a_person_from_name(self, name):
sql = "SELECT {} FROM PERSON WHERE NAME LIKE '%{}%';".format(", ".join(PERSON_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
# 搜索一个名字
def select_a_score_from_name(self, name):
sql = "SELECT {} FROM SCORE WHERE NAME LIKE '%{}%';".format(", ".join(SCORE_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
result.append(r)
return result
# 导出所有人员信息
def dump_person(self, save_dir):
data = self.select_all_person()
df = pd.DataFrame(data)
df.columns = [col for col in PERSON_FEATURE_LIST if col in df.columns]
df.to_excel(save_dir, index=False, header=PERSON_TABLE_COLUMNS)
# df.to_excel(save_dir, index=False)
# 从Excel导入成绩
def insert_many_score_from_xlsx(self, xlsx_file_dir):
df = pd.read_excel(xlsx_file_dir)
df = df[SCORE_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
self.database.executemany(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 从json导入成绩
def insert_many_score_from_json(self, data):
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
format_data = [[row[sfl] for sfl in SCORE_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_score(self, _id):
sql = "SELECT * FROM SCORE WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员成绩
def clear_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 删除所有人员成绩
def delete_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 查询成绩是否存在
def select_a_score(self, person_id, score_type, batch=None):
if batch is None:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch IS NULL AND score_type = ?;"
data = (person_id, score_type)
else:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch = ? AND score_type = ?;"
data = (person_id, batch, score_type)
try:
result = self.database.execute(sql, data)
score_data = []
for row in result:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
return score_data
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 插入一条成绩
def insert_a_score(self, person_id, name, person_class, score_type, record, score, batch=None):
if batch is None:
sql = "INSERT INTO SCORE (id, name, class, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, score_type, record, score)
else:
sql = "INSERT INTO SCORE (id, name, class, batch, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, batch, score_type, record, score)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计时类项目成绩
def update_timekeeping_score(self, record, score, person_id, score_type, batch=None):
if record is not None and record != -1:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计数类项目成绩
def update_counting_score(self, record, score, person_id, score_type, batch=None):
if record or record == 0:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得所有人员的成绩
def select_all_score(self, batch=None):
if batch:
sql = "SELECT {} FROM SCORE WHERE batch = ?;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, (batch,))
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
person_data = self.select_all_person()
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
# 获得score表
def select_score(self):
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
return score_data
# 获得所有人员的成绩
def get_a_class_score(self, batch=None, _class=None, name=None):
conditions = []
params = []
if batch is not None:
conditions.append("batch = ?")
params.append(batch)
if _class is not None:
conditions.append("class = ?")
params.append(_class)
if name is not None:
conditions.append("NAME LIKE ?")
params.append(f'%{name}%')
if conditions:
sql = "SELECT {} FROM SCORE WHERE {};".format(
", ".join(SCORE_FEATURE_LIST),
" AND ".join(conditions)
)
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, tuple(params))
person_data = self.select_class_person(_class=_class, name=name)
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
# 导出所有人员成绩
def dump_score(self, save_dir):
data = self.select_all_score()
df = pd.DataFrame(data)
print(df.columns)
df.columns = [col for col in SCORE_TYPE_FEATURE_LIST if col in df.columns]
df.to_excel(save_dir, index=False, header=SCORE_TYPE_TABLE_COLUMNS)
def format_time(self, seconds):
"""将秒数转换为 HH:mm:ss 格式"""
_seconds = float(seconds)
m, s = divmod(abs(_seconds), 60)
h, m = divmod(m, 60)
raw = f"{int(h):02d}:{int(m):02d}:{int(s):02d}"
if _seconds < 0:
raw = "-" + raw
return raw
def select_person_manager_pyqt(self,_class, name):
conditions = []
params = []
if _class is not None:
conditions.append("class = ?")
params.append(_class)
else:
conditions.append("1 = 1")
if name is not None:
conditions.append("name LIKE ?")
params.append(f'%{name}%')
sql = "SELECT {} FROM PERSON WHERE {};".format(
", ".join(PERSON_FEATURE_LIST),
" AND ".join(conditions)
)
data = self.database.execute(sql, tuple(params))
result = []
for row in data:
r = {}
for i, field in enumerate(PERSON_FEATURE_LIST):
r[field] = row[i] if row[i] is not None else ''
result.append(r)
return result
def select_score_manager_pyqt(self, _class=None, batch=None, name=None):
conditions = []
params = []
if batch is not None:
conditions.append("batch = ?")
params.append(batch)
if _class is not None:
conditions.append("class = ?")
params.append(_class)
if name is not None:
conditions.append("NAME LIKE ?")
params.append(f'%{name}%')
if conditions:
sql = "SELECT {} FROM SCORE WHERE {};".format(
", ".join(SCORE_FEATURE_LIST),
" AND ".join(conditions)
)
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, tuple(params))
person_data = self.select_person(_class=_class, name=name)
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = ''
i[count] = ''
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
score_data.append(r)
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
for p in person_data:
# 初始化最终得分
p['final_score'] = 0.0 # 初始化为浮点数
# 累加所有以 '_score' 结尾的键对应的成绩值
for score_key, score_value in p.items():
if score_key == 'running_count' and score_value != '':
p[score_key] = self.format_time(float(score_value))
elif score_key.endswith('_score') and score_key != 'final_score':
# 检查值是否为 None若是则将其视为 0.0 后再转换为浮点数
score_value = float(p[score_key]) if score_value != '' else 0.0
p['final_score'] += score_value
if 'final_score' in p and p['final_score'] != '':
p['final_score'] = round(p['final_score'], 2)
return person_data
def select_person(self, _class=None, name=None):
conditions = []
params = []
if _class is not None:
conditions.append("class = ?")
params.append(_class)
else:
# 注意这里的条件应避免SQL语法错误始终为真的条件应该是 "1=1"
conditions.append("1 = 1")
if name is not None:
conditions.append("name LIKE ?")
params.append(f'%{name}%')
sql = "SELECT {} FROM PERSON WHERE {};".format(
", ".join(PERSON_FEATURE_LIST),
" AND ".join(conditions)
)
# 执行 SQL 语句
data = self.database.execute(sql, tuple(params))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result