LISHUZUOXUN_yangjiang/LSZXBackend/base_driver.py

308 lines
13 KiB
Python
Raw Normal View History

2024-09-23 14:54:15 +08:00
import time
import traceback
import requests
from flask import Response, jsonify, request
import Backend.format_backend
import UWB
from Backend.consensus import *
from DeviceDefine.consensus import UNKNOWN
from LSZXBackend.camera_streaming import base_camera
from LSZXBackend.general import *
from LSZXBackend.tag_control import *
from LSZXNetWork.lszx_network import Network
# from LSZXNetWork.network import Network
from MCamera.camera_test import camera_is_available
from MCamera.mp_camera import Camera
from Speaker import fake_speak_base
from Speaker.speak_base import SpeakServer
from Database.manager_database import ManagerDatabase
import UWB.positioning_standalone
import UWB.positioning_standalone_v2
import UWB.positioning_standalone_pd
import UWB.positioning_cluster
from Database.database_mgr import *
class BaseDriver(Backend.format_backend.Backend):
def __init__(
self, name, domain="0.0.0.0", port=0, master_mode=True,
positioning=True, camera=True, speaker=True, multi_positioning_mode=True, device_type=UNKNOWN,
pure_mode=False
):
print("系统启动,自检中……")
self.speak_driver = fake_speak_base.SpeakServer()
if speaker:
self.speak_driver = SpeakServer()
self.speak_driver.volume_control(1)
self.speak_driver.speed_control(200)
super().__init__(name, domain, port, debug=False)
# 作训系统数据中心
print("正在启动作训数据中心服务(1/5)")
# self.manager = ManagerDatabase()
self.manager = Database()
print("作训数据中心启动成功!(1/5)")
print("正在启动网络服务(2/5)")
self.connection = Network(master_mode=master_mode, device_type=device_type)
print("网络服务启动成功!(2/5)")
print("正在启动定位服务(3/5)")
self.positioning = None
self.multi_positioning_mode = multi_positioning_mode
if positioning:
if not multi_positioning_mode:
if pure_mode:
self.positioning = UWB.positioning_standalone_pd.Positioning()
else:
self.positioning = UWB.positioning_standalone_v2.Positioning()
else:
self.positioning = UWB.positioning_cluster.Positioning()
self.positioning.start()
self.positioning.resume()
print("定位服务启动成功!(3/5)")
print("正在启动摄像头服务(4/5)")
self.camera = None
if camera:
time.sleep(5)
# 检查摄像头是否可用,不可用则等待
while not camera_is_available():
beep(freq=300, duration=500)
time.sleep(1)
self.default_source = 0
Camera.add_source(source=self.default_source)
self.camera = Camera()
else:
print("无摄像头设备!")
print('摄像头启动成功(4/5)')
print("正在启动作训终端后台(5/5)")
print("作训终端后台启动成功!(5/5)")
self.speak_driver.add_speak("系统启动成功!欢迎使用砺戍智能作训系统!")
print("系统初始化完成!")
# 原始摄像头接口
self.register(name="raw_camera", func=self.pure_camera, methods=[REQUEST_MODE_GET])
# 手环控制服务
self.register(name="tag_control_server", func=self.tag_control_server, methods=[REQUEST_MODE_POST])
self.register(name="wifi_status", func=self.wifi_status, methods=[REQUEST_MODE_GET])
self.register(name="wifi_list", func=self.wifi_list, methods=[REQUEST_MODE_GET])
self.register(name="wifi_disconnect", func=self.wifi_disconnect, methods=[REQUEST_MODE_GET])
self.register(name="wifi_connect", func=self.wifi_connect, methods=[REQUEST_MODE_POST])
self.register(name="close_all_band", func=self.close_all_band, methods=[REQUEST_MODE_GET])
self.register(name="stop_all_band_alarm", func=self.stop_all_band_alarm, methods=[REQUEST_MODE_GET])
self.register(name="stop_band_alarm", func=self.stop_band_alarm, methods=[REQUEST_MODE_POST])
def pure_camera(self):
return Response(
base_camera(self.camera, self.default_source),
mimetype='multipart/x-mixed-replace; boundary=frame'
)
def get_band_id(self):
try:
lives_bands_id = get_tag_func(
speak_driver=self.speak_driver,
positioning=self.positioning
)
return jsonify({"band_id": lives_bands_id})
except Exception as e:
error_msg = {
STATUS: STATUS_UNKNOWN_ERROR,
ERROR_MESSAGE: f"发生错误,错误来源:{traceback.format_exc()}",
MSG: str(e)
}
return jsonify(error_msg)
def tag_control_server(self):
try:
data = request.get_json()
detected_tag = None
# 获取当前手环服务
if data[CMD] == CMD_GET_BAND_ID:
detected_tag = get_tag_func(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭所有手环服务
elif data[CMD] == CMD_CLOSE_BAND:
close_all_band(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭所有手环的警报
elif data[CMD] == CMD_STOP_ALARM_ALL:
close_all_band_alarm(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭指定手环的警报
elif data[CMD] == CMD_STOP_ALARM:
tag_id = data[TAG]
stop_assign_alarm(
speak_driver=self.speak_driver,
positioning=self.positioning,
tag_id=tag_id
)
return jsonify({STATUS: STATUS_OK, DETECTED_TAG: detected_tag})
except Exception as e:
error_msg = {
STATUS: STATUS_UNKNOWN_ERROR,
ERROR_MESSAGE: f"后端系统错误{e.args},错误来源:{traceback.format_exc()}"
}
return jsonify(error_msg)
# 获得当前检测的手环
def get_band_id_passive(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
response = requests.post(
url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
json={CMD: CMD_GET_BAND_ID}
)
detected_tag = response.json()[DETECTED_TAG]
return jsonify({"band_id": detected_tag})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭所有手环
def close_all_band(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
json={CMD: CMD_CLOSE_BAND}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭所有手环警报
def stop_all_band_alarm(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:45678/tag_control_server",
json={CMD: CMD_STOP_ALARM_ALL}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭手环警报
def stop_band_alarm(self):
data = request.json
band_id = data.get(BAND_ID)
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:45678/tag_control_server",
json={CMD: CMD_STOP_ALARM_ALL, TAG: band_id}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 查看当前链接的WiFi
def wifi_status(self):
try:
response_data = self.connection.get_connected_wifi()
if response_data:
is_connected = True
else:
is_connected = False
wifi_mes = {
"is_connected": is_connected,
"wifi_name": response_data
}
return jsonify(wifi_mes)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 查看当前链接的WiFi
def wifi_list(self):
try:
response_data = self.connection.get_available_wifi()
return jsonify(response_data)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 断开wifi链接
def wifi_disconnect(self):
try:
self.connection.disconnect()
self.speak_driver.add_speak("已成功断开WiFi链接")
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 链接指定WiFi
def wifi_connect(self):
try:
data = request.json
wifi_name = data.get("name")
if wifi_name:
self.connection.disconnect()
if self.connection.connect2wifi(ssid=wifi_name):
self.speak_driver.add_speak("WiFi链接成功")
return jsonify({"status": STATUS_OK})
else:
self.speak_driver.add_speak("WiFi链接失败")
return jsonify({"status": ACTION_FAIL, "error_message": "WiFi链接失败"})
else:
self.speak_driver.add_speak("WiFi链接失败WiFi名称为空")
return jsonify({"status": ACTION_FAIL, "error_message": "WiFi名称为空"})
except Exception as e:
self.speak_driver.add_speak("WiFi链接失败")
error_msg = {
"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)
}
return jsonify(error_msg)