1092 lines
38 KiB
Python
1092 lines
38 KiB
Python
import os
|
|
import sqlite3
|
|
import time
|
|
import traceback
|
|
import pandas as pd
|
|
|
|
from AcrossPlatform.get_platform import GLOBAL_DIR
|
|
from SQLDataBase.sql_server import SQLServer
|
|
|
|
MANAGER_DIR = os.path.join(GLOBAL_DIR, "Manager")
|
|
MANAGER_DATABASE = "complete_database.db"
|
|
|
|
# 人员特征
|
|
ID = "id"
|
|
NAME = "name"
|
|
BAND_ID = "band_id"
|
|
CLASS = "class"
|
|
GENDER = "gender"
|
|
AGE = "age"
|
|
HEIGHT = "height"
|
|
WEIGHT = "weight"
|
|
BMI = "bmi"
|
|
PBF = "pbf"
|
|
PERSON_TYPE = "person_type"
|
|
PERSON_FEATURE_LIST = [
|
|
ID, NAME, BAND_ID, CLASS, GENDER, AGE, HEIGHT, WEIGHT, BMI, PBF, PERSON_TYPE
|
|
]
|
|
PERSON_TABLE_COLUMNS = ["人员编号", "姓名", "手环", "所在班", "性别", "年龄", "身高", "体重", "BMI", "PBF", "人员类型"]
|
|
# 成绩特征
|
|
BATCH = "batch"
|
|
SCORE_TYPE = "score_type"
|
|
RECORD = "record"
|
|
SCORE = "score"
|
|
COUNT = 'count'
|
|
GRADE = "grade"
|
|
|
|
# 课目列表
|
|
PUSHUP = "pushup"
|
|
SITUP = "situp"
|
|
OVERHANG = "overhang"
|
|
RUNAROUND = "runaround"
|
|
RUNNING = "running"
|
|
PULLUP = "pullup"
|
|
TRICEPDIP = 'tricepdip'
|
|
EXERCISE_NAME_LIST = {
|
|
PUSHUP: "俯卧撑",
|
|
SITUP: "仰卧起坐",
|
|
OVERHANG: "曲臂悬垂",
|
|
RUNAROUND: "30*2蛇形跑",
|
|
RUNNING: "长跑",
|
|
PULLUP: "引体向上",
|
|
TRICEPDIP: "双杠臂屈伸"
|
|
}
|
|
|
|
GRADE_GREAT = "GREAT"
|
|
GRADE_GOOD = "GOOD"
|
|
GRADE_PASS = "PASS"
|
|
GRADE_FAIL = "FAIL"
|
|
GRADE_MISS = "MISS"
|
|
|
|
GRADE_LIST = {
|
|
GRADE_GREAT: "优秀",
|
|
GRADE_GOOD: "良好",
|
|
GRADE_PASS: "及格",
|
|
GRADE_FAIL: "不及格",
|
|
GRADE_MISS: "缺考"
|
|
}
|
|
|
|
GRADE_RANK = [
|
|
GRADE_GREAT,
|
|
GRADE_GOOD,
|
|
GRADE_PASS,
|
|
GRADE_FAIL,
|
|
GRADE_MISS
|
|
]
|
|
|
|
# 统计特征
|
|
DATA = "data"
|
|
PERCENTAGE = "percentage"
|
|
BATCH_AVG = "batch_avg"
|
|
TYPE_AVG = "type_avg"
|
|
|
|
|
|
def get_grade(score):
|
|
if score >= 90:
|
|
return GRADE_GREAT
|
|
elif 70 <= score < 90:
|
|
return GRADE_GOOD
|
|
elif 60 <= score < 70:
|
|
return GRADE_PASS
|
|
elif 0 <= score < 60:
|
|
return GRADE_FAIL
|
|
else:
|
|
return GRADE_MISS
|
|
|
|
|
|
# 完整成绩特征
|
|
SCORE_FEATURE_LIST = [
|
|
ID, NAME, CLASS, BATCH, SCORE_TYPE, RECORD, SCORE
|
|
]
|
|
SCORE_TABLE_COLUMNS = [
|
|
"人员编号", "姓名", "所在班", "批次", "课目", "记录", "成绩"
|
|
]
|
|
|
|
# 数据库状态
|
|
ID_ERROR = 1
|
|
UNKNOWN_ERROR = -1
|
|
OK = 0
|
|
NO_OBJ = 2
|
|
|
|
|
|
class Database:
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.database = SQLServer(server_dir=MANAGER_DIR, server_name=MANAGER_DATABASE)
|
|
self._create_person_table()
|
|
self._create_score_table()
|
|
self._create_batch_table()
|
|
|
|
# 创建人员索引表
|
|
def _create_person_table(self):
|
|
sql = "CREATE TABLE IF NOT EXISTS PERSON (" \
|
|
"id CHAR(100) PRIMARY KEY," \
|
|
"name CHAR(100)," \
|
|
"band_id TEXT," \
|
|
"class TEXT," \
|
|
"gender CHAR(20)," \
|
|
"age INT," \
|
|
"height INT," \
|
|
"weight INT," \
|
|
"bmi FLOAT," \
|
|
"pbf FLOAT," \
|
|
"person_type INT" \
|
|
");"
|
|
self.database.execute(sql)
|
|
|
|
def _create_score_table(self):
|
|
sql = "CREATE TABLE IF NOT EXISTS SCORE (" \
|
|
"id CHAR(100)," \
|
|
"name CHAR(100)," \
|
|
"class TEXT," \
|
|
"batch TEXT," \
|
|
"score_type TEXT," \
|
|
"record TEXT," \
|
|
"score FLOAT" \
|
|
");"
|
|
self.database.execute(sql)
|
|
|
|
def _create_batch_table(self):
|
|
sql = "CREATE TABLE IF NOT EXISTS BATCH_INFO (" \
|
|
"batch TEXT," \
|
|
"start_time INT" \
|
|
");"
|
|
self.database.execute(sql)
|
|
|
|
# 获得当前批次
|
|
def get_this_batch(self):
|
|
sql = "SELECT batch FROM BATCH_INFO " \
|
|
"ORDER BY start_time DESC " \
|
|
"LIMIT 1"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [
|
|
{
|
|
SCORE_FEATURE_LIST[i]: value
|
|
for i, value in enumerate(row)}
|
|
for row in data
|
|
]
|
|
if result:
|
|
return result[0][ID]
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
# 新增一个批次
|
|
def generate_batch(self):
|
|
this_time = int(time.time())
|
|
date = time.strftime("%Y年%m月%d日%H时", time.localtime(this_time))
|
|
sql_check = "SELECT batch, start_time FROM BATCH_INFO WHERE batch = ?;"
|
|
sql_insert = "INSERT INTO BATCH_INFO (batch, start_time) VALUES (?, ?);"
|
|
try:
|
|
data = self.database.execute(sql_check, (date,))
|
|
if not data:
|
|
self.database.execute(sql_insert, (date, this_time))
|
|
self.database.commit()
|
|
return date
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_all_score(self):
|
|
sql = f"SELECT {', '.join(SCORE_FEATURE_LIST)} FROM SCORE"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [
|
|
{
|
|
SCORE_FEATURE_LIST[i]: value
|
|
for i, value in enumerate(row)}
|
|
for row in data
|
|
]
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_all_batch(self):
|
|
sql = f"SELECT {BATCH} FROM BATCH_INFO GROUP BY {BATCH} ORDER BY start_time"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [
|
|
row[0]
|
|
for row in data
|
|
]
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_all_id(self):
|
|
sql = f"SELECT {ID} FROM PERSON GROUP BY {ID}"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [
|
|
row[0]
|
|
for row in data
|
|
]
|
|
result.sort()
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_all_name(self):
|
|
sql = f"SELECT {ID}, {NAME} FROM PERSON GROUP BY {NAME} ORDER BY {ID}"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = {
|
|
row[0]: row[1]
|
|
for row in data
|
|
}
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_id_class(self):
|
|
sql = f"SELECT {ID}, {CLASS} FROM PERSON ORDER BY {ID}"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = {
|
|
row[0]: row[1]
|
|
for row in data
|
|
}
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
def get_all_class(self):
|
|
sql = f"SELECT {CLASS} FROM PERSON GROUP BY {CLASS}"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [
|
|
row[0]
|
|
for row in data
|
|
]
|
|
result.sort()
|
|
return result
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_all_exercise_types():
|
|
return list(EXERCISE_NAME_LIST.keys())
|
|
|
|
# 获得各课目训练汇总
|
|
def get_exercise_statistic(self, type_list, batch):
|
|
# 按照项目、成绩统计
|
|
grade_table = {
|
|
_type: {
|
|
grade: {
|
|
DATA: [],
|
|
PERCENTAGE: 0
|
|
}
|
|
for grade in GRADE_LIST
|
|
}
|
|
for _type in type_list
|
|
}
|
|
try:
|
|
full_data = self.get_all_score()
|
|
all_id = self.get_all_id()
|
|
all_name = self.get_all_name()
|
|
# 筛选
|
|
selected_data = full_data.copy()
|
|
selected_data = list(filter(lambda x: x[SCORE_TYPE] in type_list, selected_data))
|
|
selected_data = list(filter(lambda x: x[BATCH] in batch, selected_data))
|
|
# 初始化输出表
|
|
person_group = {}
|
|
for _id in all_id:
|
|
person_group.setdefault(_id, {
|
|
_type: {
|
|
GRADE: GRADE_MISS,
|
|
SCORE: 0,
|
|
RECORD: None
|
|
}
|
|
for _type in type_list
|
|
})
|
|
# 计算成绩等级
|
|
for data in selected_data:
|
|
person_group[data[ID]][data[SCORE_TYPE]][GRADE] = get_grade(data[SCORE])
|
|
person_group[data[ID]][data[SCORE_TYPE]][SCORE] = data[SCORE]
|
|
person_group[data[ID]][data[SCORE_TYPE]][RECORD] = data[RECORD]
|
|
for _id, id_values in person_group.items():
|
|
for _type, type_values in id_values.items():
|
|
grade = type_values[GRADE]
|
|
score = type_values[SCORE]
|
|
record = type_values[RECORD]
|
|
grade_table[_type][grade][DATA].append({
|
|
ID: _id,
|
|
NAME: all_name[_id],
|
|
SCORE: score,
|
|
RECORD: record
|
|
})
|
|
# 最终统计
|
|
for _type, type_values in grade_table.items():
|
|
for grade, grade_values in type_values.items():
|
|
data = grade_values[DATA]
|
|
grade_values[PERCENTAGE] = len(data) / len(all_id)
|
|
except Exception as e:
|
|
print(e.args, traceback.format_exc())
|
|
return grade_table
|
|
|
|
# 各班级科目汇总
|
|
def get_class_statistic(self, batch, _type):
|
|
all_class = self.get_all_class()
|
|
# 按照班级进行统计
|
|
class_table = {
|
|
_class: {
|
|
grade: {
|
|
DATA: [],
|
|
PERCENTAGE: 0
|
|
}
|
|
for grade in GRADE_LIST
|
|
}
|
|
for _class in all_class
|
|
}
|
|
try:
|
|
full_data = self.get_all_score()
|
|
all_id = self.get_all_id()
|
|
all_name = self.get_all_name()
|
|
all_id_class = self.get_id_class()
|
|
# 筛选
|
|
selected_data = full_data.copy()
|
|
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
|
|
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
|
|
# 初始化输出表
|
|
person_group = {}
|
|
for _id in all_id:
|
|
person_group.setdefault(_id, {
|
|
GRADE: GRADE_MISS,
|
|
SCORE: 0,
|
|
RECORD: None,
|
|
CLASS: None
|
|
})
|
|
# 计算成绩等级
|
|
for data in selected_data:
|
|
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
|
|
person_group[data[ID]][SCORE] = data[SCORE]
|
|
person_group[data[ID]][RECORD] = data[RECORD]
|
|
person_group[data[ID]][CLASS] = all_id_class[data[ID]]
|
|
for _id, id_values in person_group.items():
|
|
_class = id_values[CLASS]
|
|
grade = id_values[GRADE]
|
|
score = id_values[SCORE]
|
|
record = id_values[RECORD]
|
|
class_table[_class][grade][DATA].append({
|
|
ID: _id,
|
|
NAME: all_name[_id],
|
|
SCORE: score,
|
|
RECORD: record
|
|
})
|
|
# 最终统计
|
|
for _class, class_values in class_table.items():
|
|
for grade, grade_values in class_values.items():
|
|
data = grade_values[DATA]
|
|
grade_values[PERCENTAGE] = len(data) / sum(
|
|
[
|
|
len(class_data[DATA])
|
|
for class_data in class_values.values()
|
|
]
|
|
)
|
|
except Exception as e:
|
|
print(e.args, traceback.format_exc())
|
|
return class_table
|
|
|
|
# 集体科目训练汇总
|
|
def get_group_statistic(self, _type):
|
|
all_batch = self.get_all_batch()
|
|
# 按照批次进行统计
|
|
batch_table = {
|
|
batch: {
|
|
grade: {
|
|
DATA: [],
|
|
PERCENTAGE: 0
|
|
}
|
|
for grade in GRADE_LIST
|
|
}
|
|
for batch in all_batch
|
|
}
|
|
try:
|
|
full_data = self.get_all_score()
|
|
all_id = self.get_all_id()
|
|
all_name = self.get_all_name()
|
|
# 筛选
|
|
selected_data = full_data.copy()
|
|
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
|
|
# 初始化输出表
|
|
person_group = {}
|
|
for _id in all_id:
|
|
person_group.setdefault(_id, {
|
|
batch: {
|
|
GRADE: GRADE_MISS,
|
|
SCORE: 0,
|
|
RECORD: None,
|
|
BATCH: None
|
|
}
|
|
for batch in all_batch
|
|
})
|
|
# 计算成绩等级
|
|
for data in selected_data:
|
|
person_group[data[ID]][data[BATCH]][GRADE] = get_grade(data[SCORE])
|
|
person_group[data[ID]][data[BATCH]][SCORE] = data[SCORE]
|
|
person_group[data[ID]][data[BATCH]][RECORD] = data[RECORD]
|
|
# print(data, get_grade(data[SCORE]))
|
|
for _id, id_values in person_group.items():
|
|
for batch, batch_values in id_values.items():
|
|
grade = batch_values[GRADE]
|
|
score = batch_values[SCORE]
|
|
record = batch_values[RECORD]
|
|
batch_table[batch][grade][DATA].append({
|
|
ID: _id,
|
|
NAME: all_name[_id],
|
|
SCORE: score,
|
|
RECORD: record
|
|
})
|
|
# 最终统计
|
|
for batch, batch_values in batch_table.items():
|
|
for grade, grade_values in batch_values.items():
|
|
data = grade_values[DATA]
|
|
grade_values[PERCENTAGE] = len(data) / sum(
|
|
[
|
|
len(batch_data[DATA])
|
|
for batch_data in batch_values.values()
|
|
]
|
|
)
|
|
except Exception as e:
|
|
print(e.args, traceback.format_exc())
|
|
return batch_table
|
|
|
|
# 集体统计
|
|
def get_group_multiple_statistic(self, _type, class_list, batch):
|
|
# 按照批次进行统计
|
|
table = {
|
|
grade: {
|
|
DATA: []
|
|
}
|
|
for grade in GRADE_LIST
|
|
}
|
|
try:
|
|
full_data = self.get_all_score()
|
|
all_id = self.get_all_id()
|
|
all_name = self.get_all_name()
|
|
id_class = self.get_id_class()
|
|
# 筛选
|
|
selected_data = full_data.copy()
|
|
selected_data = list(filter(lambda x: x[SCORE_TYPE] == _type, selected_data))
|
|
selected_data = list(filter(lambda x: x[CLASS] in class_list, selected_data))
|
|
selected_data = list(filter(lambda x: x[BATCH] == batch, selected_data))
|
|
# 初始化输出表
|
|
person_group = {}
|
|
for _id in all_id:
|
|
if id_class[_id] not in class_list:
|
|
continue
|
|
person_group.setdefault(_id, {
|
|
GRADE: GRADE_MISS,
|
|
SCORE: 0,
|
|
RECORD: None,
|
|
BATCH: None
|
|
})
|
|
# 计算成绩等级
|
|
for data in selected_data:
|
|
person_group[data[ID]][GRADE] = get_grade(data[SCORE])
|
|
person_group[data[ID]][SCORE] = data[SCORE]
|
|
person_group[data[ID]][RECORD] = data[RECORD]
|
|
for _id, id_values in person_group.items():
|
|
grade = id_values[GRADE]
|
|
score = id_values[SCORE]
|
|
record = id_values[RECORD]
|
|
table[grade][DATA].append({
|
|
ID: _id,
|
|
NAME: all_name[_id],
|
|
SCORE: score,
|
|
RECORD: record
|
|
})
|
|
except Exception as e:
|
|
print(e.args, traceback.format_exc())
|
|
# 最终统计
|
|
return table
|
|
|
|
# 个人统计
|
|
def get_individual_statistic(self, _id, _type_list, batch_list):
|
|
# 初始化输出表
|
|
group = {
|
|
BATCH: {},
|
|
BATCH_AVG: {batch: 0 for batch in batch_list},
|
|
TYPE_AVG: {_type: 0 for _type in _type_list}
|
|
}
|
|
for batch in batch_list:
|
|
group[BATCH].setdefault(batch, {
|
|
_type: {
|
|
SCORE: 0
|
|
}
|
|
for _type in _type_list
|
|
})
|
|
try:
|
|
full_data = self.get_all_score()
|
|
# 筛选
|
|
selected_data = full_data.copy()
|
|
selected_data = list(filter(lambda x: x[ID] == _id, selected_data))
|
|
selected_data = list(filter(lambda x: x[SCORE_TYPE] in _type_list, selected_data))
|
|
selected_data = list(filter(lambda x: x[BATCH] in batch_list, selected_data))
|
|
# 计算成绩
|
|
for data in selected_data:
|
|
group[BATCH][data[BATCH]][data[SCORE_TYPE]][SCORE] = data[SCORE]
|
|
# 按照类型、批次进行统计
|
|
for batch, batch_values in group[BATCH].items():
|
|
batch_avg = sum([score[SCORE] for score in batch_values.values()]) / len(_type_list)
|
|
group[BATCH_AVG][batch] = batch_avg
|
|
for _type, type_values in batch_values.items():
|
|
group[TYPE_AVG][_type] = max(type_values[SCORE], group[TYPE_AVG][_type])
|
|
except Exception as e:
|
|
print(e.args, traceback.format_exc())
|
|
return group
|
|
|
|
# 插入信息
|
|
def insert_a_info(self, data):
|
|
format_data = [data.get(column) for column in PERSON_FEATURE_LIST]
|
|
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
|
|
", ".join(PERSON_FEATURE_LIST),
|
|
", ".join(["?" for _ in PERSON_FEATURE_LIST])
|
|
)
|
|
format_data1 = [data.get(column) for column in SCORE_FEATURE_LIST]
|
|
sql1 = "INSERT INTO SCORE ({}) VALUES ({});".format(
|
|
", ".join(SCORE_FEATURE_LIST),
|
|
", ".join(["?" for _ in SCORE_FEATURE_LIST])
|
|
)
|
|
try:
|
|
self.database.execute(sql, format_data)
|
|
self.database.execute(sql1, format_data1)
|
|
# self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 从Excel导入一个数据
|
|
def insert_many_person_from_xlsx(self, xlsx_file_name):
|
|
df = pd.read_excel(xlsx_file_name)
|
|
df = df[PERSON_TABLE_COLUMNS]
|
|
data = df.values.tolist()
|
|
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
|
|
", ".join(PERSON_FEATURE_LIST),
|
|
", ".join(["?" for _ in PERSON_FEATURE_LIST])
|
|
)
|
|
sql2 = "INSERT INTO SCORE ({}) VALUES ({});".format(
|
|
", ".join(SCORE_FEATURE_LIST),
|
|
", ".join(["?" for _ in SCORE_FEATURE_LIST])
|
|
)
|
|
format_dir = [{PERSON_FEATURE_LIST[col]: row[col] for col in range(len(row))} for row in data]
|
|
try:
|
|
self.database.executemany(sql, data)
|
|
format_data = [[row.get(sfl) for sfl in SCORE_FEATURE_LIST] for row in format_dir]
|
|
self.database.executemany(sql2, format_data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
|
|
# 从json导入一个数据
|
|
def insert_many_person_from_json(self, data):
|
|
sql = "INSERT INTO PERSON ({}) VALUES ({});".format(
|
|
", ".join(PERSON_FEATURE_LIST),
|
|
", ".join(["?" for _ in PERSON_FEATURE_LIST])
|
|
)
|
|
try:
|
|
format_data = [[row[pfl] for pfl in PERSON_FEATURE_LIST] for row in data]
|
|
self.database.executemany(sql, format_data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
|
|
# 删除人员信息
|
|
def delete_a_person(self, _id):
|
|
sql = "SELECT * FROM PERSON WHERE id = ?"
|
|
person = self.database.execute(sql, (_id,))
|
|
if person:
|
|
sql = "DELETE FROM PERSON WHERE id = ?"
|
|
sql1 = "DELETE FROM SCORE WHERE id = ?"
|
|
try:
|
|
self.database.execute(sql, (_id,))
|
|
self.database.execute(sql1, (_id,))
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(e)
|
|
return UNKNOWN_ERROR
|
|
else:
|
|
return NO_OBJ
|
|
|
|
# 删除所有人员信息
|
|
def delete_all_person(self):
|
|
sql = "DELETE FROM PERSON"
|
|
sql1 = "DELETE FROM SCORE"
|
|
try:
|
|
self.database.execute(sql)
|
|
self.database.execute(sql1)
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 获得某个人员的信息
|
|
def select_a_person(self, _id):
|
|
sql = "SELECT {} FROM PERSON WHERE id = ?;".format(", ".join(PERSON_FEATURE_LIST))
|
|
data = self.database.execute(sql, (_id,))
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 获得某个人员的信息
|
|
def select_a_person_from_band(self, band_id):
|
|
sql = "SELECT {} FROM PERSON WHERE band_id = ?;".format(", ".join(PERSON_FEATURE_LIST))
|
|
data = self.database.execute(sql, (band_id,))
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 获取一个班级的所有人员
|
|
def get_a_class(self, _class):
|
|
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
|
|
data = self.database.execute(sql, (_class,))
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 获得所有人员的信息
|
|
def select_all_person(self):
|
|
sql = "SELECT {} FROM PERSON;".format(", ".join(PERSON_FEATURE_LIST))
|
|
data = self.database.execute(sql)
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 获得一个班的人员的信息
|
|
def select_class_person(self, _class):
|
|
sql = "SELECT {} FROM PERSON WHERE class = ?;".format(", ".join(PERSON_FEATURE_LIST))
|
|
data = self.database.execute(sql, (_class,))
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 修改指定人员的信息
|
|
def update_a_person(self, data):
|
|
sql = "SELECT * FROM PERSON WHERE id = ?"
|
|
person = self.database.execute(sql, (data[ID],))
|
|
if person:
|
|
format_data = [data.get(column) for column in PERSON_FEATURE_LIST] + [data.get(ID)]
|
|
sql = "UPDATE PERSON SET {} WHERE id = ?".format(", ".join(
|
|
[
|
|
"{}=?".format(feature)
|
|
for feature in PERSON_FEATURE_LIST
|
|
]
|
|
))
|
|
self.database.execute(sql, format_data)
|
|
format_data = [data.get(column) for column in SCORE_FEATURE_LIST] + [data.get(ID)]
|
|
sql = "UPDATE SCORE SET {} WHERE id = ?".format(", ".join(
|
|
[
|
|
"{}=?".format(feature)
|
|
for feature in SCORE_FEATURE_LIST
|
|
]
|
|
))
|
|
self.database.execute(sql, format_data)
|
|
self.database.commit()
|
|
return OK
|
|
else:
|
|
return NO_OBJ
|
|
|
|
# 搜索一个名字
|
|
def select_a_person_from_name(self, name):
|
|
sql = "SELECT {} FROM PERSON WHERE NAME LIKE '%{}%';".format(", ".join(PERSON_FEATURE_LIST), name)
|
|
data = self.database.execute(sql)
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
PERSON_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(PERSON_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 搜索一个名字
|
|
def select_a_score_from_name(self, name):
|
|
sql = "SELECT {} FROM SCORE WHERE NAME LIKE '%{}%';".format(", ".join(SCORE_FEATURE_LIST), name)
|
|
data = self.database.execute(sql)
|
|
result = []
|
|
for row in data:
|
|
r = {
|
|
SCORE_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(SCORE_FEATURE_LIST))
|
|
}
|
|
result.append(r)
|
|
return result
|
|
|
|
# 导出所有人员信息
|
|
def dump_person(self, save_dir):
|
|
data = self.select_all_person()
|
|
df = pd.DataFrame(data)
|
|
df.to_excel(save_dir, index=False)
|
|
|
|
# 从Excel导入成绩
|
|
def insert_many_score_from_xlsx(self, xlsx_file_dir):
|
|
df = pd.read_excel(xlsx_file_dir)
|
|
df = df[SCORE_TABLE_COLUMNS]
|
|
data = df.values.tolist()
|
|
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
|
|
", ".join(SCORE_FEATURE_LIST),
|
|
", ".join(["?" for _ in SCORE_FEATURE_LIST])
|
|
)
|
|
try:
|
|
self.database.executemany(sql, data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
|
|
# 从json导入成绩
|
|
def insert_many_score_from_json(self, data):
|
|
sql = "INSERT INTO SCORE ({}) VALUES ({});".format(
|
|
", ".join(SCORE_FEATURE_LIST),
|
|
", ".join(["?" for _ in SCORE_FEATURE_LIST])
|
|
)
|
|
try:
|
|
format_data = [[row[sfl] for sfl in SCORE_FEATURE_LIST] for row in data]
|
|
self.database.executemany(sql, format_data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
|
|
# 删除人员信息
|
|
def delete_a_score(self, _id):
|
|
sql = "SELECT * FROM SCORE WHERE id = ?"
|
|
person = self.database.execute(sql, (_id,))
|
|
if person:
|
|
sql = "DELETE FROM SCORE WHERE id = ?"
|
|
try:
|
|
self.database.execute(sql, (_id,))
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(e)
|
|
return UNKNOWN_ERROR
|
|
else:
|
|
return NO_OBJ
|
|
|
|
# 删除所有人员成绩
|
|
def clear_all_score(self):
|
|
sql = "DELETE FROM SCORE;"
|
|
try:
|
|
self.database.execute(sql)
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(e)
|
|
return UNKNOWN_ERROR
|
|
|
|
# 删除所有人员成绩
|
|
def delete_all_score(self):
|
|
sql = "DELETE FROM SCORE;"
|
|
try:
|
|
self.database.execute(sql)
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(e)
|
|
return UNKNOWN_ERROR
|
|
|
|
# 查询成绩是否存在
|
|
def select_a_score(self, person_id, score_type, batch=None):
|
|
if batch is None:
|
|
sql = "SELECT * FROM SCORE WHERE id = ? AND batch IS NULL AND score_type = ?;"
|
|
data = (person_id, score_type)
|
|
else:
|
|
sql = "SELECT * FROM SCORE WHERE id = ? AND batch = ? AND score_type = ?;"
|
|
data = (person_id, batch, score_type)
|
|
try:
|
|
result = self.database.execute(sql, data)
|
|
score_data = []
|
|
for row in result:
|
|
r = {
|
|
SCORE_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(SCORE_FEATURE_LIST))
|
|
}
|
|
if r['score_type']:
|
|
score_data.append(r)
|
|
if score_data:
|
|
return score_data
|
|
except Exception as e:
|
|
print(e)
|
|
return UNKNOWN_ERROR
|
|
|
|
# 插入一条成绩
|
|
def insert_a_score(self, person_id, name, person_class, score_type, record, score, batch=None):
|
|
if batch is None:
|
|
sql = "INSERT INTO SCORE (id, name, class, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?);"
|
|
data = (person_id, name, person_class, score_type, record, score)
|
|
else:
|
|
sql = "INSERT INTO SCORE (id, name, class, batch, score_type, record, score) VALUES (?, ?, ?, ?, ?, ?, ?);"
|
|
data = (person_id, name, person_class, batch, score_type, record, score)
|
|
try:
|
|
self.database.execute(sql, data)
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 插入计时类项目成绩
|
|
def update_timekeeping_score(self, record, score, person_id, score_type, batch=None):
|
|
if record is not None and record != -1:
|
|
if batch is None:
|
|
sql = """
|
|
UPDATE SCORE
|
|
SET record = ?, score = ?
|
|
WHERE id = ? AND score_type = ?;
|
|
"""
|
|
data = (record, score, person_id, score_type)
|
|
else:
|
|
sql = """
|
|
UPDATE SCORE
|
|
SET record = ?, score = ?
|
|
WHERE id = ? AND batch = ? AND score_type = ?;
|
|
"""
|
|
data = (record, score, person_id, batch, score_type)
|
|
try:
|
|
self.database.execute(sql, data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 插入计数类项目成绩
|
|
def update_counting_score(self, record, score, person_id, score_type, batch=None):
|
|
if record or record == 0:
|
|
if batch is None:
|
|
sql = """
|
|
UPDATE SCORE
|
|
SET record = ?, score = ?
|
|
WHERE id = ? AND score_type = ?;
|
|
"""
|
|
data = (record, score, person_id, score_type)
|
|
else:
|
|
sql = """
|
|
UPDATE SCORE
|
|
SET record = ?, score = ?
|
|
WHERE id = ? AND batch = ? AND score_type = ?;
|
|
"""
|
|
data = (record, score, person_id, batch, score_type)
|
|
try:
|
|
self.database.execute(sql, data)
|
|
self.database.commit()
|
|
return OK
|
|
except sqlite3.IntegrityError:
|
|
return ID_ERROR
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 获得所有人员的成绩
|
|
def select_all_score(self, batch=None):
|
|
if batch:
|
|
sql = "SELECT {} FROM SCORE WHERE batch = ?;".format(", ".join(SCORE_FEATURE_LIST))
|
|
data = self.database.execute(sql, (batch,))
|
|
else:
|
|
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
|
|
data = self.database.execute(sql)
|
|
person_data = self.select_all_person()
|
|
for i in person_data:
|
|
for j in EXERCISE_NAME_LIST:
|
|
if j == RUNAROUND:
|
|
score = 'run_bf' + '_score'
|
|
count = 'run_bf' + '_count'
|
|
elif j == OVERHANG:
|
|
score = 'hanging' + '_score'
|
|
count = 'hanging' + '_count'
|
|
else:
|
|
score = j + '_score'
|
|
count = j + '_count'
|
|
i[score] = None
|
|
i[count] = None
|
|
score_data = []
|
|
for row in data:
|
|
r = {
|
|
SCORE_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(SCORE_FEATURE_LIST))
|
|
}
|
|
if r['score_type']:
|
|
score_data.append(r)
|
|
if score_data:
|
|
for s in score_data:
|
|
for p in person_data:
|
|
if s['id'] == p['id']:
|
|
if s['score_type'] == RUNAROUND:
|
|
score = 'run_bf' + '_score'
|
|
count = 'run_bf' + '_count'
|
|
elif s['score_type'] == OVERHANG:
|
|
score = 'hanging' + '_score'
|
|
count = 'hanging' + '_count'
|
|
else:
|
|
score = s['score_type'] + '_score'
|
|
count = s['score_type'] + '_count'
|
|
p[score] = s['score']
|
|
p[count] = round(float(s['record']), 2)
|
|
return person_data
|
|
|
|
# 获得score表
|
|
def select_score(self):
|
|
sql = "SELECT {} FROM SCORE;".format(", ".join(SCORE_FEATURE_LIST))
|
|
data = self.database.execute(sql)
|
|
|
|
score_data = []
|
|
for row in data:
|
|
r = {
|
|
SCORE_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(SCORE_FEATURE_LIST))
|
|
}
|
|
if r['score_type']:
|
|
score_data.append(r)
|
|
return score_data
|
|
|
|
# 获得所有人员的成绩
|
|
def get_a_class_score(self, _class):
|
|
this_batch = self.get_this_batch()
|
|
sql = "SELECT {} FROM SCORE WHERE batch = ? AND class = ?;".format(", ".join(SCORE_FEATURE_LIST))
|
|
data = self.database.execute(sql, (this_batch, _class,))
|
|
|
|
person_data = self.select_class_person(_class)
|
|
for i in person_data:
|
|
for j in EXERCISE_NAME_LIST:
|
|
if j == RUNAROUND:
|
|
score = 'run_bf' + '_score'
|
|
count = 'run_bf' + '_count'
|
|
elif j == OVERHANG:
|
|
score = 'hanging' + '_score'
|
|
count = 'hanging' + '_count'
|
|
else:
|
|
score = j + '_score'
|
|
count = j + '_count'
|
|
i[score] = None
|
|
i[count] = None
|
|
|
|
score_data = []
|
|
for row in data:
|
|
r = {
|
|
SCORE_FEATURE_LIST[i]: row[i]
|
|
for i in range(len(SCORE_FEATURE_LIST))
|
|
}
|
|
if r['score_type']:
|
|
score_data.append(r)
|
|
for s in score_data:
|
|
for p in person_data:
|
|
if s['id'] == p['id']:
|
|
if s['score_type'] == RUNAROUND:
|
|
score = 'run_bf' + '_score'
|
|
count = 'run_bf' + '_count'
|
|
elif s['score_type'] == OVERHANG:
|
|
score = 'hanging' + '_score'
|
|
count = 'hanging' + '_count'
|
|
else:
|
|
score = s['score_type'] + '_score'
|
|
count = s['score_type'] + '_count'
|
|
p[score] = s['score']
|
|
p[count] = round(float(s['record']), 2)
|
|
return person_data
|
|
|
|
# 删除空成绩
|
|
def delete_empty_score_type(self):
|
|
sql = "DELETE FROM SCORE WHERE score_type IS NULL OR score_type = '';"
|
|
try:
|
|
self.database.execute(sql)
|
|
self.database.commit()
|
|
return OK
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
# 导出所有人员成绩
|
|
def dump_score(self, save_dir):
|
|
data = self.select_all_score()
|
|
df = pd.DataFrame(data)
|
|
df.to_excel(save_dir, index=False)
|
|
|
|
# 删除空batch
|
|
def delete_orphan_batches(self):
|
|
# 获取所有批次的列表
|
|
all_batches = self.get_all_batch()
|
|
# 获取SCORE表中存在的批次列表
|
|
existing_batches = self.get_existing_batches()
|
|
# 找到需要删除的批次
|
|
batches_to_delete = [batch for batch in all_batches if batch not in existing_batches]
|
|
|
|
for batch in batches_to_delete:
|
|
sql = "DELETE FROM BATCH_INFO WHERE batch = ?;"
|
|
try:
|
|
self.database.execute(sql, (batch,))
|
|
self.database.commit()
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return UNKNOWN_ERROR
|
|
|
|
return OK
|
|
|
|
def get_existing_batches(self):
|
|
sql = "SELECT DISTINCT batch FROM SCORE WHERE batch IS NOT NULL;"
|
|
try:
|
|
data = self.database.execute(sql)
|
|
result = [row[0] for row in data]
|
|
return result
|
|
except Exception as e:
|
|
print(f"数据库错误,错误原因{e.args}\n{traceback.format_exc()}")
|
|
return None
|