LISHUZUOXUN_yangjiang/BaseStation/fake_hrbo_info_pyqt.py

92 lines
3.0 KiB
Python

import random
import pandas as pd
from LSZXBackend.general import *
from LSZXNetWork.lszx_network import *
MAX_HR = "max_hr"
MIN_BO = "min_bo"
HR = "hr"
BO = "bo"
video_path = os.path.join(GLOBAL_DIR, "LSZXVideo", "Video")
# 模拟收集人员hrbo信息
class BaseStation:
def __init__(self):
self.send_information_signal = threading.Event()
file_path = os.path.join(GLOBAL_DIR, 'BaseStation', 'PERSON.xlsx')
sheet_name = 'PERSON'
df = pd.read_excel(file_path, sheet_name=sheet_name)
column_name = 'band_id'
column_data = df[column_name].tolist() if not df[column_name].empty else []
self.HrBo_info = {}
for i in column_data:
hr = 85
self.HrBo_info[i] = {'hr': {'hr': hr, 'color': 'G'}, 'bo': {'bo': "{:.2%}".format(94 / 100), 'color': 'G'},
'normal': True, 'record_time': time.time()}
self.ip = '127.0.0.1'
self.server = Server(ip="0.0.0.0", port=server_port, file_port=server_file_port, cache_path=video_path)
self.client = Client(ip="0.0.0.0")
threading.Thread(target=self.send_fake_information).start()
# 获取hrbo信息
def get_information(self):
while True:
try:
this_time = time.time()
for k, v in self.HrBo_info.items():
if this_time - v['record_time'] > 15:
hr = random.randint(72, 145)
bo = random.randint(83, 97)
v['record_time'] = this_time
if 180 >= hr > 140:
hr_c = 'Y'
elif hr > 180:
hr_c = 'R'
else:
hr_c = 'G'
if 80 <= bo < 88:
bo_c = 'Y'
elif bo < 80:
bo_c = 'R'
else:
bo_c = 'G'
v['hr'] = {'hr': hr, 'color': hr_c}
v['bo'] = {'bo': "{:.2%}".format(bo / 100), 'color': bo_c}
time.sleep(0.5)
except Exception as e:
print(traceback.format_exc())
# 发送hrbo信息给管理端
def send_fake_information(self):
while True:
try:
time.sleep(0.5)
broadcast_pkg = [{
"data": self.HrBo_info, 'ip': self.ip, 'data_type': 'hrbo_data'
}]
print(broadcast_pkg)
# 发送成绩信息
address_list = []
pkg_list = []
address_list.append((self.ip, server_port))
pkg_list.append(broadcast_pkg)
self.client.distributed_send(address_list, pkg_list)
except Exception as e:
print(traceback.format_exc())
if __name__ == '__main__':
basestation = BaseStation()
threading.Thread(target=basestation.get_information).start()