LISHUZUOXUN_yangjiang/PureBackend/base_driver.py

224 lines
9.4 KiB
Python

# coding: gb2312
import UWB
import UWB.positioning_cluster
import UWB.positioning_standalone
import UWB.positioning_standalone_pd
import UWB.positioning_standalone_v2
from Database.database_pyqt import *
from AcrossPlatform.get_platform import *
from DeviceDefine.consensus import UNKNOWN
from LSZXBackend.tag_control import *
from LSZXNetWork.lszx_network import Network
from LogRecord.log_recorder import GLOBAL_LOG
from Speaker import fake_speak_base
from Speaker.speak_base import SpeakServer
from LSZXVideo.video_recording import VideoRecording
from LSZXVideo.video_sender import VideoSending
MODEL_MEDIAPIPE = "mediapipe"
MODEL_MOVE_NET = "move_net"
class BaseDriver:
def __init__(
self, master_mode=True, positioning=True, camera=True, model=MODEL_MEDIAPIPE, speaker=True,
multi_positioning_mode=True, device_type=UNKNOWN, pure_mode=False
):
GLOBAL_LOG.write("系统启动,自检中……", need_print=True)
self.speak_driver = fake_speak_base.SpeakServer()
if speaker:
self.speak_driver = SpeakServer()
self.speak_driver.volume_control(1)
self.speak_driver.speed_control(200)
# 作训系统数据中心
GLOBAL_LOG.write("正在启动作训数据中心服务(1/5)", need_print=True)
self.manager = Database()
GLOBAL_LOG.write("作训数据中心启动成功!(1/5)", need_print=True)
GLOBAL_LOG.write("正在启动网络服务(2/5)", need_print=True)
self.connection = Network(master_mode=master_mode, device_type=device_type)
GLOBAL_LOG.write("网络服务启动成功!(2/5)", need_print=True)
GLOBAL_LOG.write("正在启动定位服务(3/5)", need_print=True)
self.positioning = None
self.multi_positioning_mode = multi_positioning_mode
if positioning:
if not multi_positioning_mode:
if pure_mode:
self.positioning = UWB.positioning_standalone_pd.Positioning()
else:
self.positioning = UWB.positioning_standalone_v2.Positioning()
else:
self.positioning = UWB.positioning_cluster.Positioning()
self.positioning.start()
self.positioning.resume()
GLOBAL_LOG.write("定位服务启动成功!(3/5)", need_print=True)
GLOBAL_LOG.write("正在启动摄像头服务(4/5)", need_print=True)
self.camera = None
if camera:
# time.sleep(5)
# # 检查摄像头是否可用,不可用则等待
# while not camera_is_available():
# beep(freq=300, duration=500)
# time.sleep(1)
self.default_source = 0
if model == MODEL_MEDIAPIPE:
import MCamera.mp_camera
MCamera.mp_camera.Camera.add_source(source=self.default_source)
self.camera = MCamera.mp_camera.Camera()
elif model == MODEL_MOVE_NET:
import MCamera.mn_camera
MCamera.mn_camera.Camera.add_source(source=self.default_source)
self.camera = MCamera.mn_camera.Camera()
else:
GLOBAL_LOG.write("无摄像头设备!", need_print=True)
GLOBAL_LOG.write('摄像头启动成功(4/5)', need_print=True)
GLOBAL_LOG.write("正在启动作训终端后台(5/5)", need_print=True)
if SYS_PLATFORM == WINDOWS:
# 准备视频录制
self.video_recorder = VideoRecording()
self.video_recorder.start()
# 视频发送
self.video_sending = VideoSending()
GLOBAL_LOG.write("作训终端后台启动成功!(5/5)", need_print=True)
self.speak_driver.add_speak("系统启动成功!欢迎使用砺戍智能作训系统!")
GLOBAL_LOG.write("系统初始化完成!", need_print=True)
def pure_camera(self):
return self.camera.get_frame(self.default_source)
def get_band_id(self):
try:
lives_bands_id = get_tag_func(
speak_driver=self.speak_driver,
positioning=self.positioning
)
data = {"band_id": lives_bands_id}
except Exception as e:
GLOBAL_LOG.write(f"发生错误,{str(e)},错误来源:{traceback.format_exc()}")
data = None
return data
# # TODO: 待修复
# def tag_control_server(self, data):
# try:
# detected_tag = None
# # 获取当前手环服务
# if data[CMD] == CMD_GET_BAND_ID:
# detected_tag = get_tag_func(
# speak_driver=self.speak_driver,
# positioning=self.positioning
# )
# # 关闭所有手环服务
# elif data[CMD] == CMD_CLOSE_BAND:
# close_all_band(
# speak_driver=self.speak_driver,
# positioning=self.positioning
# )
# # 关闭所有手环的警报
# elif data[CMD] == CMD_STOP_ALARM_ALL:
# close_all_band_alarm(
# speak_driver=self.speak_driver,
# positioning=self.positioning
# )
# # 关闭指定手环的警报
# elif data[CMD] == CMD_STOP_ALARM:
# tag_id = data[TAG]
# stop_assign_alarm(
# speak_driver=self.speak_driver,
# positioning=self.positioning,
# tag_id=tag_id
# )
# return detected_tag
# except Exception as e:
# GLOBAL_LOG.write(f"发生错误,{str(e)},错误来源:{traceback.format_exc()}")
#
# # 获得当前检测的手环
# def get_band_id_passive(self):
# try:
# if self.connection.get_connected_wifi_name():
# master_ip = self.connection.get_master_ip()
# else:
# error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
# return jsonify(error_msg)
# except Exception as e:
# GLOBAL_LOG.write(f"发生错误,{str(e)},错误来源:{traceback.format_exc()}")
# try:
# response = requests.post(
# url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
# json={CMD: CMD_GET_BAND_ID}
# )
# detected_tag = response.json()[DETECTED_TAG]
# return jsonify({"band_id": detected_tag})
# except Exception as e:
# GLOBAL_LOG.write(f"发生错误,{str(e)},错误来源:{traceback.format_exc()}")
#
# # 关闭所有手环
# def close_all_band(self):
# try:
# if self.connection.get_connected_wifi_name():
# master_ip = self.connection.get_master_ip()
# else:
# error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
# return jsonify(error_msg)
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
# return jsonify(error_msg)
# try:
# requests.post(
# url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
# json={CMD: CMD_CLOSE_BAND}
# )
# return jsonify({"status": STATUS_OK})
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR,
# "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
# print(traceback.format_exc())
# return jsonify(error_msg)
#
# # 关闭所有手环警报
# def stop_all_band_alarm(self):
# try:
# if self.connection.get_connected_wifi_name():
# master_ip = self.connection.get_master_ip()
# else:
# error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
# return jsonify(error_msg)
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
# return jsonify(error_msg)
# try:
# requests.post(
# url=f"http://{master_ip}:45678/tag_control_server",
# json={CMD: CMD_STOP_ALARM_ALL}
# )
# return jsonify({"status": STATUS_OK})
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR,
# "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
# print(traceback.format_exc())
# return jsonify(error_msg)
#
# # 关闭手环警报
# def stop_band_alarm(self):
# data = request.json
# band_id = data.get(BAND_ID)
# try:
# if self.connection.get_connected_wifi_name():
# master_ip = self.connection.get_master_ip()
# else:
# error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
# return jsonify(error_msg)
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
# return jsonify(error_msg)
# try:
# requests.post(
# url=f"http://{master_ip}:45678/tag_control_server",
# json={CMD: CMD_STOP_ALARM_ALL, TAG: band_id}
# )
# return jsonify({"status": STATUS_OK})
# except Exception as e:
# error_msg = {"status": STATUS_UNKNOWN_ERROR,
# "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
# return jsonify(error_msg)