import time import traceback import requests from flask import Response, jsonify, request import Backend.format_backend import UWB from Backend.consensus import * from DeviceDefine.consensus import UNKNOWN from LSZXBackend.camera_streaming import base_camera from LSZXBackend.general import * from LSZXBackend.tag_control import * from LSZXNetWork.lszx_network import Network # from LSZXNetWork.network import Network from MCamera.camera_test import camera_is_available from MCamera.mp_camera import Camera from Speaker import fake_speak_base from Speaker.speak_base import SpeakServer from Database.manager_database import ManagerDatabase import UWB.positioning_standalone import UWB.positioning_standalone_v2 import UWB.positioning_standalone_pd import UWB.positioning_cluster from Database.database_mgr import * class BaseDriver(Backend.format_backend.Backend): def __init__( self, name, domain="0.0.0.0", port=0, master_mode=True, positioning=True, camera=True, speaker=True, multi_positioning_mode=True, device_type=UNKNOWN, pure_mode=False ): print("系统启动,自检中……") self.speak_driver = fake_speak_base.SpeakServer() if speaker: self.speak_driver = SpeakServer() self.speak_driver.volume_control(1) self.speak_driver.speed_control(200) super().__init__(name, domain, port, debug=False) # 作训系统数据中心 print("正在启动作训数据中心服务(1/5)") # self.manager = ManagerDatabase() self.manager = Database() print("作训数据中心启动成功!(1/5)") print("正在启动网络服务(2/5)") self.connection = Network(master_mode=master_mode, device_type=device_type) print("网络服务启动成功!(2/5)") print("正在启动定位服务(3/5)") self.positioning = None self.multi_positioning_mode = multi_positioning_mode if positioning: if not multi_positioning_mode: if pure_mode: self.positioning = UWB.positioning_standalone_pd.Positioning() else: self.positioning = UWB.positioning_standalone_v2.Positioning() else: self.positioning = UWB.positioning_cluster.Positioning() self.positioning.start() self.positioning.resume() print("定位服务启动成功!(3/5)") print("正在启动摄像头服务(4/5)") self.camera = None if camera: time.sleep(5) # 检查摄像头是否可用,不可用则等待 while not camera_is_available(): beep(freq=300, duration=500) time.sleep(1) self.default_source = 0 Camera.add_source(source=self.default_source) self.camera = Camera() else: print("无摄像头设备!") print('摄像头启动成功(4/5)') print("正在启动作训终端后台(5/5)") print("作训终端后台启动成功!(5/5)") self.speak_driver.add_speak("系统启动成功!欢迎使用砺戍智能作训系统!") print("系统初始化完成!") # 原始摄像头接口 self.register(name="raw_camera", func=self.pure_camera, methods=[REQUEST_MODE_GET]) # 手环控制服务 self.register(name="tag_control_server", func=self.tag_control_server, methods=[REQUEST_MODE_POST]) self.register(name="wifi_status", func=self.wifi_status, methods=[REQUEST_MODE_GET]) self.register(name="wifi_list", func=self.wifi_list, methods=[REQUEST_MODE_GET]) self.register(name="wifi_disconnect", func=self.wifi_disconnect, methods=[REQUEST_MODE_GET]) self.register(name="wifi_connect", func=self.wifi_connect, methods=[REQUEST_MODE_POST]) self.register(name="close_all_band", func=self.close_all_band, methods=[REQUEST_MODE_GET]) self.register(name="stop_all_band_alarm", func=self.stop_all_band_alarm, methods=[REQUEST_MODE_GET]) self.register(name="stop_band_alarm", func=self.stop_band_alarm, methods=[REQUEST_MODE_POST]) def pure_camera(self): return Response( base_camera(self.camera, self.default_source), mimetype='multipart/x-mixed-replace; boundary=frame' ) def get_band_id(self): try: lives_bands_id = get_tag_func( speak_driver=self.speak_driver, positioning=self.positioning ) return jsonify({"band_id": lives_bands_id}) except Exception as e: error_msg = { STATUS: STATUS_UNKNOWN_ERROR, ERROR_MESSAGE: f"发生错误,错误来源:{traceback.format_exc()}", MSG: str(e) } return jsonify(error_msg) def tag_control_server(self): try: data = request.get_json() detected_tag = None # 获取当前手环服务 if data[CMD] == CMD_GET_BAND_ID: detected_tag = get_tag_func( speak_driver=self.speak_driver, positioning=self.positioning ) # 关闭所有手环服务 elif data[CMD] == CMD_CLOSE_BAND: close_all_band( speak_driver=self.speak_driver, positioning=self.positioning ) # 关闭所有手环的警报 elif data[CMD] == CMD_STOP_ALARM_ALL: close_all_band_alarm( speak_driver=self.speak_driver, positioning=self.positioning ) # 关闭指定手环的警报 elif data[CMD] == CMD_STOP_ALARM: tag_id = data[TAG] stop_assign_alarm( speak_driver=self.speak_driver, positioning=self.positioning, tag_id=tag_id ) return jsonify({STATUS: STATUS_OK, DETECTED_TAG: detected_tag}) except Exception as e: error_msg = { STATUS: STATUS_UNKNOWN_ERROR, ERROR_MESSAGE: f"后端系统错误{e.args},错误来源:{traceback.format_exc()}" } return jsonify(error_msg) # 获得当前检测的手环 def get_band_id_passive(self): try: if self.connection.get_connected_wifi_name(): master_ip = self.connection.get_master_ip() else: error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"} return jsonify(error_msg) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"} return jsonify(error_msg) try: response = requests.post( url=f"http://{master_ip}:{TERMINAL}/tag_control_server", json={CMD: CMD_GET_BAND_ID} ) detected_tag = response.json()[DETECTED_TAG] return jsonify({"band_id": detected_tag}) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} print(traceback.format_exc()) return jsonify(error_msg) # 关闭所有手环 def close_all_band(self): try: if self.connection.get_connected_wifi_name(): master_ip = self.connection.get_master_ip() else: error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"} return jsonify(error_msg) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"} return jsonify(error_msg) try: requests.post( url=f"http://{master_ip}:{TERMINAL}/tag_control_server", json={CMD: CMD_CLOSE_BAND} ) return jsonify({"status": STATUS_OK}) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} print(traceback.format_exc()) return jsonify(error_msg) # 关闭所有手环警报 def stop_all_band_alarm(self): try: if self.connection.get_connected_wifi_name(): master_ip = self.connection.get_master_ip() else: error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"} return jsonify(error_msg) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"} return jsonify(error_msg) try: requests.post( url=f"http://{master_ip}:45678/tag_control_server", json={CMD: CMD_STOP_ALARM_ALL} ) return jsonify({"status": STATUS_OK}) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} print(traceback.format_exc()) return jsonify(error_msg) # 关闭手环警报 def stop_band_alarm(self): data = request.json band_id = data.get(BAND_ID) try: if self.connection.get_connected_wifi_name(): master_ip = self.connection.get_master_ip() else: error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"} return jsonify(error_msg) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"} return jsonify(error_msg) try: requests.post( url=f"http://{master_ip}:45678/tag_control_server", json={CMD: CMD_STOP_ALARM_ALL, TAG: band_id} ) return jsonify({"status": STATUS_OK}) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} return jsonify(error_msg) # 查看当前链接的WiFi def wifi_status(self): try: response_data = self.connection.get_connected_wifi() if response_data: is_connected = True else: is_connected = False wifi_mes = { "is_connected": is_connected, "wifi_name": response_data } return jsonify(wifi_mes) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} return jsonify(error_msg) # 查看当前链接的WiFi def wifi_list(self): try: response_data = self.connection.get_available_wifi() return jsonify(response_data) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} return jsonify(error_msg) # 断开wifi链接 def wifi_disconnect(self): try: self.connection.disconnect() self.speak_driver.add_speak("已成功断开WiFi链接!") return jsonify({"status": STATUS_OK}) except Exception as e: error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)} return jsonify(error_msg) # 链接指定WiFi def wifi_connect(self): try: data = request.json wifi_name = data.get("name") if wifi_name: self.connection.disconnect() if self.connection.connect2wifi(ssid=wifi_name): self.speak_driver.add_speak("WiFi链接成功!") return jsonify({"status": STATUS_OK}) else: self.speak_driver.add_speak("WiFi链接失败!") return jsonify({"status": ACTION_FAIL, "error_message": "WiFi链接失败"}) else: self.speak_driver.add_speak("WiFi链接失败!WiFi名称为空") return jsonify({"status": ACTION_FAIL, "error_message": "WiFi名称为空"}) except Exception as e: self.speak_driver.add_speak("WiFi链接失败!") error_msg = { "status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e) } return jsonify(error_msg)