LISHUZUOXUN_yangjiang/Database/database_mgr.py

1092 lines
38 KiB
Python
Raw Normal View History

2024-09-23 14:54:15 +08:00
import os
import sqlite3
import time
import traceback
import pandas as pd
from AcrossPlatform.get_platform import GLOBAL_DIR
from SQLDataBase.sql_server import SQLServer
MANAGER_DIR = os.path.join(GLOBAL_DIR, "Manager")
MANAGER_DATABASE = "complete_database.db"
# 人员特征
ID = "id"
NAME = "name"
BAND_ID = "band_id"
CLASS = "class"
GENDER = "gender"
AGE = "age"
HEIGHT = "height"
WEIGHT = "weight"
BMI = "bmi"
PBF = "pbf"
PERSON_TYPE = "person_type"
PERSON_FEATURE_LIST = [
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE
]
PERSON_TABLE_COLUMNS = ["人员编号", "姓名", "手环", "所在班", "性别", "年龄", "身高", "体重", "BMI", "PBF", "人员类型"]
# 成绩特征
BATCH = "batch"
SCORE_TYPE = "score_type"
RECORD = "record"
SCORE = "score"
COUNT = 'count'
GRADE = "grade"
# 课目列表
PUSHUP = "pushup"
SITUP = "situp"
OVERHANG = "overhang"
RUNAROUND = "runaround"
RUNNING = "running"
PULLUP = "pullup"
2024-09-23 16:02:54 +08:00
TRICEPDIP = 'tricepdip'
2024-09-23 14:54:15 +08:00
EXERCISE_NAME_LIST = {
PUSHUP: "俯卧撑",
SITUP: "仰卧起坐",
OVERHANG: "曲臂悬垂",
RUNAROUND: "30*2蛇形跑",
RUNNING: "长跑",
2024-09-23 16:02:54 +08:00
PULLUP: "引体向上",
TRICEPDIP: "双杠臂屈伸"
2024-09-23 14:54:15 +08:00
}
GRADE_GREAT = "GREAT"
GRADE_GOOD = "GOOD"
GRADE_PASS = "PASS"
GRADE_FAIL = "FAIL"
GRADE_MISS = "MISS"
GRADE_LIST = {
GRADE_GREAT: "优秀",
GRADE_GOOD: "良好",
GRADE_PASS: "及格",
GRADE_FAIL: "不及格",
GRADE_MISS: "缺考"
}
GRADE_RANK = [
GRADE_GREAT,
GRADE_GOOD,
GRADE_PASS,
GRADE_FAIL,
GRADE_MISS
]
# 统计特征
DATA = "data"
PERCENTAGE = "percentage"
BATCH_AVG = "batch_avg"
TYPE_AVG = "type_avg"
def get_grade(score):
if score >= 90:
return GRADE_GREAT
elif 70 <= score < 90:
return GRADE_GOOD
elif 60 <= score < 70:
return GRADE_PASS
elif 0 <= score < 60:
return GRADE_FAIL
else:
return GRADE_MISS
# 完整成绩特征
SCORE_FEATURE_LIST = [
ID, NAME, CLASS, BATCH, SCORE_TYPE, RECORD, SCORE
]
SCORE_TABLE_COLUMNS = [
"人员编号", "姓名", "所在班", "批次", "课目", "记录", "成绩"
]
# 数据库状态
ID_ERROR = 1
UNKNOWN_ERROR = -1
OK = 0
NO_OBJ = 2
class Database:
def __init__(self) -> None:
super().__init__()
self.database = SQLServer(server_dir=MANAGER_DIR, server_name=MANAGER_DATABASE)
self._create_person_table()
self._create_score_table()
self._create_batch_table()
# 创建人员索引表
def _create_person_table(self):
sql = "CREATE TABLE IF NOT EXISTS PERSON (" \
"id CHAR(100) PRIMARY KEY," \
"name CHAR(100)," \
"band_id TEXT," \
"class TEXT," \
"gender CHAR(20)," \
"age INT," \
"height INT," \
"weight INT," \
"bmi FLOAT," \
"pbf FLOAT," \
"person_type INT" \
");"
self.database.execute(sql)
def _create_score_table(self):
sql = "CREATE TABLE IF NOT EXISTS SCORE (" \
"id CHAR(100)," \
"name CHAR(100)," \
"class TEXT," \
"batch TEXT," \
"score_type TEXT," \
"record TEXT," \
"score FLOAT" \
");"
self.database.execute(sql)
def _create_batch_table(self):
sql = "CREATE TABLE IF NOT EXISTS BATCH_INFO (" \
"batch TEXT," \
"start_time INT" \
");"
self.database.execute(sql)
# 获得当前批次
def get_this_batch(self):
sql = "SELECT batch FROM BATCH_INFO " \
"ORDER BY start_time DESC " \
"LIMIT 1"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
if result:
return result[0][ID]
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
# 新增一个批次
def generate_batch(self):
this_time = int(time.time())
date = time.strftime("%Y年%m月%d%H时", time.localtime(this_time))
sql_check = "SELECT batch, start_time FROM BATCH_INFO WHERE batch = ?;"
sql_insert = "INSERT INTO BATCH_INFO (batch, start_time) VALUES (?, ?);"
try:
data = self.database.execute(sql_check, (date,))
if not data:
self.database.execute(sql_insert, (date, this_time))
self.database.commit()
return date
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_score(self):
sql = f"SELECT {', '.join(SCORE_FEATURE_LIST)} FROM SCORE"
try:
data = self.database.execute(sql)
result = [
{
SCORE_FEATURE_LIST[i]: value
for i, value in enumerate(row)}
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_batch(self):
sql = f"SELECT {BATCH} FROM BATCH_INFO GROUP BY {BATCH} ORDER BY start_time"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_id(self):
sql = f"SELECT {ID} FROM PERSON GROUP BY {ID}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_name(self):
sql = f"SELECT {ID}, {NAME} FROM PERSON GROUP BY {NAME} ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_id_class(self):
sql = f"SELECT {ID}, {CLASS} FROM PERSON ORDER BY {ID}"
try:
data = self.database.execute(sql)
result = {
row[0]: row[1]
for row in data
}
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
def get_all_class(self):
sql = f"SELECT {CLASS} FROM PERSON GROUP BY {CLASS}"
try:
data = self.database.execute(sql)
result = [
row[0]
for row in data
]
result.sort()
return result
except sqlite3.IntegrityError:
return None
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None
@staticmethod
def get_all_exercise_types():
return list(EXERCISE_NAME_LIST.keys())
# 获得各课目训练汇总
def get_exercise_statistic(self, type_list, batch):
# 按照项目、成绩统计
grade_table = {
_type: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _type in type_list
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] in type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
_type: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None
}
for _type in type_list
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[SCORE_TYPE]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[SCORE_TYPE]][SCORE] = data[SCORE]
person_group[data[ID]][data[SCORE_TYPE]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
for _type, type_values in id_values.items():
grade = type_values[GRADE]
score = type_values[SCORE]
record = type_values[RECORD]
grade_table[_type][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _type, type_values in grade_table.items():
for grade, grade_values in type_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / len(all_id)
except Exception as e:
print(e.args, traceback.format_exc())
return grade_table
# 各班级科目汇总
def get_class_statistic(self, batch, _type):
all_class = self.get_all_class()
# 按照班级进行统计
class_table = {
_class: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for _class in all_class
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
all_id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
CLASS: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
person_group[data[ID]][CLASS] = all_id_class[data[ID]]
for _id, id_values in person_group.items():
_class = id_values[CLASS]
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
class_table[_class][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for _class, class_values in class_table.items():
for grade, grade_values in class_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(class_data[DATA])
for class_data in class_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return class_table
# 集体科目训练汇总
def get_group_statistic(self, _type):
all_batch = self.get_all_batch()
# 按照批次进行统计
batch_table = {
batch: {
grade: {
DATA: [],
PERCENTAGE: 0
}
for grade in GRADE_LIST
}
for batch in all_batch
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
person_group.setdefault(_id, {
batch: {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
}
for batch in all_batch
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][data[BATCH]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][data[BATCH]][SCORE] = data[SCORE]
person_group[data[ID]][data[BATCH]][RECORD] = data[RECORD]
# print(data, get_grade(data[SCORE]))
for _id, id_values in person_group.items():
for batch, batch_values in id_values.items():
grade = batch_values[GRADE]
score = batch_values[SCORE]
record = batch_values[RECORD]
batch_table[batch][grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
# 最终统计
for batch, batch_values in batch_table.items():
for grade, grade_values in batch_values.items():
data = grade_values[DATA]
grade_values[PERCENTAGE] = len(data) / sum(
[
len(batch_data[DATA])
for batch_data in batch_values.values()
]
)
except Exception as e:
print(e.args, traceback.format_exc())
return batch_table
# 集体统计
def get_group_multiple_statistic(self, _type, class_list, batch):
# 按照批次进行统计
table = {
grade: {
DATA: []
}
for grade in GRADE_LIST
}
try:
full_data = self.get_all_score()
all_id = self.get_all_id()
all_name = self.get_all_name()
id_class = self.get_id_class()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
selected_data = list(filter(lambda x: x[CLASS] in class_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
# 初始化输出表
person_group = {}
for _id in all_id:
if id_class[_id] not in class_list:
continue
person_group.setdefault(_id, {
GRADE: GRADE_MISS,
SCORE: 0,
RECORD: None,
BATCH: None
})
# 计算成绩等级
for data in selected_data:
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
person_group[data[ID]][SCORE] = data[SCORE]
person_group[data[ID]][RECORD] = data[RECORD]
for _id, id_values in person_group.items():
grade = id_values[GRADE]
score = id_values[SCORE]
record = id_values[RECORD]
table[grade][DATA].append({
ID: _id,
NAME: all_name[_id],
SCORE: score,
RECORD: record
})
except Exception as e:
print(e.args, traceback.format_exc())
# 最终统计
return table
# 个人统计
def get_individual_statistic(self, _id, _type_list, batch_list):
# 初始化输出表
group = {
BATCH: {},
BATCH_AVG: {batch: 0 for batch in batch_list},
TYPE_AVG: {_type: 0 for _type in _type_list}
}
for batch in batch_list:
group[BATCH].setdefault(batch, {
_type: {
SCORE: 0
}
for _type in _type_list
})
try:
full_data = self.get_all_score()
# 筛选
selected_data = full_data.copy()
selected_data = list(filter(lambda x: x[ID] == _id, selected_data))
selected_data = list(filter(lambda x: x[SCORE_TYPE] in _type_list, selected_data))
selected_data = list(filter(lambda x: x[BATCH] in batch_list, selected_data))
# 计算成绩
for data in selected_data:
group[BATCH][data[BATCH]][data[SCORE_TYPE]][SCORE] = data[SCORE]
# 按照类型、批次进行统计
for batch, batch_values in group[BATCH].items():
batch_avg = sum([score[SCORE] for score in batch_values.values()]) / len(_type_list)
group[BATCH_AVG][batch] = batch_avg
for _type, type_values in batch_values.items():
group[TYPE_AVG][_type] = max(type_values[SCORE], group[TYPE_AVG][_type])
except Exception as e:
print(e.args, traceback.format_exc())
return group
# 插入信息
def insert_a_info(self, data):
format_data = [data.get(column) for column in PERSON_FEATURE_LIST]
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
format_data1 = [data.get(column) for column in SCORE_FEATURE_LIST]
sql1 = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
self.database.execute(sql, format_data)
self.database.execute(sql1, format_data1)
# self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 从Excel导入一个数据
def insert_many_person_from_xlsx(self, xlsx_file_name):
df = pd.read_excel(xlsx_file_name)
df = df[PERSON_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
sql2 = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
format_dir = [{PERSON_FEATURE_LIST[col]: row[col] for col in range(len(row))} for row in data]
try:
self.database.executemany(sql, data)
format_data = [[row.get(sfl) for sfl in SCORE_FEATURE_LIST] for row in format_dir]
self.database.executemany(sql2, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 从json导入一个数据
def insert_many_person_from_json(self, data):
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
", ".join(PERSON_FEATURE_LIST),
", ".join(["?" for _ in PERSON_FEATURE_LIST])
)
try:
format_data = [[row[pfl] for pfl in PERSON_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_person(self, _id):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM PERSON WHERE id = ?"
sql1 = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.execute(sql1, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员信息
def delete_all_person(self):
sql = "DELETE FROM PERSON"
sql1 = "DELETE FROM SCORE"
try:
self.database.execute(sql)
self.database.execute(sql1)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得某个人员的信息
def select_a_person(self, _id):
sql = "SELECT {} FROM PERSON WHERE id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获得某个人员的信息
def select_a_person_from_band(self, band_id):
sql = "SELECT {} FROM PERSON WHERE band_id = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (band_id,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获取一个班级的所有人员
def get_a_class(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获得所有人员的信息
def select_all_person(self):
sql = "SELECT {} FROM PERSON;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql)
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 获得一个班的人员的信息
def select_class_person(self, _class):
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
data = self.database.execute(sql, (_class,))
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 修改指定人员的信息
def update_a_person(self, data):
sql = "SELECT * FROM PERSON WHERE id = ?"
person = self.database.execute(sql, (data[ID],))
if person:
format_data = [data.get(column) for column in PERSON_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE PERSON SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in PERSON_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
format_data = [data.get(column) for column in SCORE_FEATURE_LIST] + [data.get(ID)]
sql = "UPDATE SCORE SET {} WHERE id = ?".format(", ".join(
[
"{}=?".format(feature)
for feature in SCORE_FEATURE_LIST
]
))
self.database.execute(sql, format_data)
self.database.commit()
return OK
else:
return NO_OBJ
# 搜索一个名字
def select_a_person_from_name(self, name):
sql = "SELECT {} FROM PERSON WHERE NAME LIKE '%{}%';".format(", ".join(PERSON_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {
PERSON_FEATURE_LIST[i]: row[i]
for i in range(len(PERSON_FEATURE_LIST))
}
result.append(r)
return result
# 搜索一个名字
def select_a_score_from_name(self, name):
sql = "SELECT {} FROM SCORE WHERE NAME LIKE '%{}%';".format(", ".join(SCORE_FEATURE_LIST), name)
data = self.database.execute(sql)
result = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
result.append(r)
return result
# 导出所有人员信息
def dump_person(self, save_dir):
data = self.select_all_person()
df = pd.DataFrame(data)
df.to_excel(save_dir, index=False)
# 从Excel导入成绩
def insert_many_score_from_xlsx(self, xlsx_file_dir):
df = pd.read_excel(xlsx_file_dir)
df = df[SCORE_TABLE_COLUMNS]
data = df.values.tolist()
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
self.database.executemany(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 从json导入成绩
def insert_many_score_from_json(self, data):
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
", ".join(SCORE_FEATURE_LIST),
", ".join(["?" for _ in SCORE_FEATURE_LIST])
)
try:
format_data = [[row[sfl] for sfl in SCORE_FEATURE_LIST] for row in data]
self.database.executemany(sql, format_data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
# 删除人员信息
def delete_a_score(self, _id):
sql = "SELECT * FROM SCORE WHERE id = ?"
person = self.database.execute(sql, (_id,))
if person:
sql = "DELETE FROM SCORE WHERE id = ?"
try:
self.database.execute(sql, (_id,))
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
else:
return NO_OBJ
# 删除所有人员成绩
def clear_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 删除所有人员成绩
def delete_all_score(self):
sql = "DELETE FROM SCORE;"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 查询成绩是否存在
def select_a_score(self, person_id, score_type, batch=None):
if batch is None:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch IS NULL AND score_type = ?;"
data = (person_id, score_type)
else:
sql = "SELECT * FROM SCORE WHERE id = ? AND batch = ? AND score_type = ?;"
data = (person_id, batch, score_type)
try:
result = self.database.execute(sql, data)
score_data = []
for row in result:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
2024-09-23 16:02:54 +08:00
if r['score_type']:
score_data.append(r)
2024-09-23 14:54:15 +08:00
if score_data:
return score_data
except Exception as e:
print(e)
return UNKNOWN_ERROR
# 插入一条成绩
def insert_a_score(self, person_id, name, person_class, score_type, record, score, batch=None):
if batch is None:
sql = "INSERT INTO SCORE (id, name, class, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, score_type, record, score)
else:
sql = "INSERT INTO SCORE (id, name, class, batch, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?, ?);"
data = (person_id, name, person_class, batch, score_type, record, score)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计时类项目成绩
def update_timekeeping_score(self, record, score, person_id, score_type, batch=None):
if record is not None and record != -1:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 插入计数类项目成绩
def update_counting_score(self, record, score, person_id, score_type, batch=None):
if record or record == 0:
if batch is None:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND score_type = ?;
"""
data = (record, score, person_id, score_type)
else:
sql = """
UPDATE SCORE
SET record = ?, score = ?
WHERE id = ? AND batch = ? AND score_type = ?;
"""
data = (record, score, person_id, batch, score_type)
try:
self.database.execute(sql, data)
self.database.commit()
return OK
except sqlite3.IntegrityError:
return ID_ERROR
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
# 获得所有人员的成绩
def select_all_score(self, batch=None):
if batch:
sql = "SELECT {} FROM SCORE WHERE batch = ?;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, (batch,))
else:
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
person_data = self.select_all_person()
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = None
i[count] = None
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
2024-09-23 16:02:54 +08:00
if r['score_type']:
score_data.append(r)
2024-09-23 14:54:15 +08:00
if score_data:
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
return person_data
# 获得score表
def select_score(self):
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql)
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
2024-09-23 16:02:54 +08:00
if r['score_type']:
score_data.append(r)
2024-09-23 14:54:15 +08:00
return score_data
# 获得所有人员的成绩
def get_a_class_score(self, _class):
this_batch = self.get_this_batch()
sql = "SELECT {} FROM SCORE WHERE batch = ? AND class = ?;".format(", ".join(SCORE_FEATURE_LIST))
data = self.database.execute(sql, (this_batch, _class,))
person_data = self.select_class_person(_class)
for i in person_data:
for j in EXERCISE_NAME_LIST:
if j == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif j == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = j + '_score'
count = j + '_count'
i[score] = None
i[count] = None
score_data = []
for row in data:
r = {
SCORE_FEATURE_LIST[i]: row[i]
for i in range(len(SCORE_FEATURE_LIST))
}
2024-09-23 16:02:54 +08:00
if r['score_type']:
score_data.append(r)
2024-09-23 14:54:15 +08:00
for s in score_data:
for p in person_data:
if s['id'] == p['id']:
if s['score_type'] == RUNAROUND:
score = 'run_bf' + '_score'
count = 'run_bf' + '_count'
elif s['score_type'] == OVERHANG:
score = 'hanging' + '_score'
count = 'hanging' + '_count'
else:
score = s['score_type'] + '_score'
count = s['score_type'] + '_count'
p[score] = s['score']
p[count] = round(float(s['record']), 2)
return person_data
2024-09-23 16:02:54 +08:00
# 删除空成绩
def delete_empty_score_type(self):
sql = "DELETE FROM SCORE WHERE score_type IS NULL OR score_type = '';"
try:
self.database.execute(sql)
self.database.commit()
return OK
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
2024-09-23 14:54:15 +08:00
# 导出所有人员成绩
def dump_score(self, save_dir):
data = self.select_all_score()
df = pd.DataFrame(data)
df.to_excel(save_dir, index=False)
2024-09-23 16:02:54 +08:00
# 删除空batch
def delete_orphan_batches(self):
# 获取所有批次的列表
all_batches = self.get_all_batch()
# 获取SCORE表中存在的批次列表
existing_batches = self.get_existing_batches()
# 找到需要删除的批次
batches_to_delete = [batch for batch in all_batches if batch not in existing_batches]
for batch in batches_to_delete:
sql = "DELETE FROM BATCH_INFO WHERE batch = ?;"
try:
self.database.execute(sql, (batch,))
self.database.commit()
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return UNKNOWN_ERROR
return OK
def get_existing_batches(self):
sql = "SELECT DISTINCT batch FROM SCORE WHERE batch IS NOT NULL;"
try:
data = self.database.execute(sql)
result = [row[0] for row in data]
return result
except Exception as e:
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
return None