LISHUZUOXUN_yangjiang/LSZXBackend/base_driver.py

308 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import traceback
import requests
from flask import Response, jsonify, request
import Backend.format_backend
import UWB
from Backend.consensus import *
from DeviceDefine.consensus import UNKNOWN
from LSZXBackend.camera_streaming import base_camera
from LSZXBackend.general import *
from LSZXBackend.tag_control import *
from LSZXNetWork.lszx_network import Network
# from LSZXNetWork.network import Network
from MCamera.camera_test import camera_is_available
from MCamera.mp_camera import Camera
from Speaker import fake_speak_base
from Speaker.speak_base import SpeakServer
from Database.manager_database import ManagerDatabase
import UWB.positioning_standalone
import UWB.positioning_standalone_v2
import UWB.positioning_standalone_pd
import UWB.positioning_cluster
from Database.database_mgr import *
class BaseDriver(Backend.format_backend.Backend):
def __init__(
self, name, domain="0.0.0.0", port=0, master_mode=True,
positioning=True, camera=True, speaker=True, multi_positioning_mode=True, device_type=UNKNOWN,
pure_mode=False
):
print("系统启动,自检中……")
self.speak_driver = fake_speak_base.SpeakServer()
if speaker:
self.speak_driver = SpeakServer()
self.speak_driver.volume_control(1)
self.speak_driver.speed_control(200)
super().__init__(name, domain, port, debug=False)
# 作训系统数据中心
print("正在启动作训数据中心服务(1/5)")
# self.manager = ManagerDatabase()
self.manager = Database()
print("作训数据中心启动成功!(1/5)")
print("正在启动网络服务(2/5)")
self.connection = Network(master_mode=master_mode, device_type=device_type)
print("网络服务启动成功!(2/5)")
print("正在启动定位服务(3/5)")
self.positioning = None
self.multi_positioning_mode = multi_positioning_mode
if positioning:
if not multi_positioning_mode:
if pure_mode:
self.positioning = UWB.positioning_standalone_pd.Positioning()
else:
self.positioning = UWB.positioning_standalone_v2.Positioning()
else:
self.positioning = UWB.positioning_cluster.Positioning()
self.positioning.start()
self.positioning.resume()
print("定位服务启动成功!(3/5)")
print("正在启动摄像头服务(4/5)")
self.camera = None
if camera:
time.sleep(5)
# 检查摄像头是否可用,不可用则等待
while not camera_is_available():
beep(freq=300, duration=500)
time.sleep(1)
self.default_source = 0
Camera.add_source(source=self.default_source)
self.camera = Camera()
else:
print("无摄像头设备!")
print('摄像头启动成功(4/5)')
print("正在启动作训终端后台(5/5)")
print("作训终端后台启动成功!(5/5)")
self.speak_driver.add_speak("系统启动成功!欢迎使用砺戍智能作训系统!")
print("系统初始化完成!")
# 原始摄像头接口
self.register(name="raw_camera", func=self.pure_camera, methods=[REQUEST_MODE_GET])
# 手环控制服务
self.register(name="tag_control_server", func=self.tag_control_server, methods=[REQUEST_MODE_POST])
self.register(name="wifi_status", func=self.wifi_status, methods=[REQUEST_MODE_GET])
self.register(name="wifi_list", func=self.wifi_list, methods=[REQUEST_MODE_GET])
self.register(name="wifi_disconnect", func=self.wifi_disconnect, methods=[REQUEST_MODE_GET])
self.register(name="wifi_connect", func=self.wifi_connect, methods=[REQUEST_MODE_POST])
self.register(name="close_all_band", func=self.close_all_band, methods=[REQUEST_MODE_GET])
self.register(name="stop_all_band_alarm", func=self.stop_all_band_alarm, methods=[REQUEST_MODE_GET])
self.register(name="stop_band_alarm", func=self.stop_band_alarm, methods=[REQUEST_MODE_POST])
def pure_camera(self):
return Response(
base_camera(self.camera, self.default_source),
mimetype='multipart/x-mixed-replace; boundary=frame'
)
def get_band_id(self):
try:
lives_bands_id = get_tag_func(
speak_driver=self.speak_driver,
positioning=self.positioning
)
return jsonify({"band_id": lives_bands_id})
except Exception as e:
error_msg = {
STATUS: STATUS_UNKNOWN_ERROR,
ERROR_MESSAGE: f"发生错误,错误来源:{traceback.format_exc()}",
MSG: str(e)
}
return jsonify(error_msg)
def tag_control_server(self):
try:
data = request.get_json()
detected_tag = None
# 获取当前手环服务
if data[CMD] == CMD_GET_BAND_ID:
detected_tag = get_tag_func(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭所有手环服务
elif data[CMD] == CMD_CLOSE_BAND:
close_all_band(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭所有手环的警报
elif data[CMD] == CMD_STOP_ALARM_ALL:
close_all_band_alarm(
speak_driver=self.speak_driver,
positioning=self.positioning
)
# 关闭指定手环的警报
elif data[CMD] == CMD_STOP_ALARM:
tag_id = data[TAG]
stop_assign_alarm(
speak_driver=self.speak_driver,
positioning=self.positioning,
tag_id=tag_id
)
return jsonify({STATUS: STATUS_OK, DETECTED_TAG: detected_tag})
except Exception as e:
error_msg = {
STATUS: STATUS_UNKNOWN_ERROR,
ERROR_MESSAGE: f"后端系统错误{e.args},错误来源:{traceback.format_exc()}"
}
return jsonify(error_msg)
# 获得当前检测的手环
def get_band_id_passive(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
response = requests.post(
url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
json={CMD: CMD_GET_BAND_ID}
)
detected_tag = response.json()[DETECTED_TAG]
return jsonify({"band_id": detected_tag})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭所有手环
def close_all_band(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:{TERMINAL}/tag_control_server",
json={CMD: CMD_CLOSE_BAND}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭所有手环警报
def stop_all_band_alarm(self):
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:45678/tag_control_server",
json={CMD: CMD_STOP_ALARM_ALL}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
print(traceback.format_exc())
return jsonify(error_msg)
# 关闭手环警报
def stop_band_alarm(self):
data = request.json
band_id = data.get(BAND_ID)
try:
if self.connection.get_connected_wifi_name():
master_ip = self.connection.get_master_ip()
else:
error_msg = {"status": CONNECTION_ERROR, "error_message": "网络未链接!"}
return jsonify(error_msg)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR, "error_message": f"网络未链接!{e.args}"}
return jsonify(error_msg)
try:
requests.post(
url=f"http://{master_ip}:45678/tag_control_server",
json={CMD: CMD_STOP_ALARM_ALL, TAG: band_id}
)
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 查看当前链接的WiFi
def wifi_status(self):
try:
response_data = self.connection.get_connected_wifi()
if response_data:
is_connected = True
else:
is_connected = False
wifi_mes = {
"is_connected": is_connected,
"wifi_name": response_data
}
return jsonify(wifi_mes)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 查看当前链接的WiFi
def wifi_list(self):
try:
response_data = self.connection.get_available_wifi()
return jsonify(response_data)
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 断开wifi链接
def wifi_disconnect(self):
try:
self.connection.disconnect()
self.speak_driver.add_speak("已成功断开WiFi链接")
return jsonify({"status": STATUS_OK})
except Exception as e:
error_msg = {"status": STATUS_UNKNOWN_ERROR,
"error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)}
return jsonify(error_msg)
# 链接指定WiFi
def wifi_connect(self):
try:
data = request.json
wifi_name = data.get("name")
if wifi_name:
self.connection.disconnect()
if self.connection.connect2wifi(ssid=wifi_name):
self.speak_driver.add_speak("WiFi链接成功")
return jsonify({"status": STATUS_OK})
else:
self.speak_driver.add_speak("WiFi链接失败")
return jsonify({"status": ACTION_FAIL, "error_message": "WiFi链接失败"})
else:
self.speak_driver.add_speak("WiFi链接失败WiFi名称为空")
return jsonify({"status": ACTION_FAIL, "error_message": "WiFi名称为空"})
except Exception as e:
self.speak_driver.add_speak("WiFi链接失败")
error_msg = {
"status": STATUS_UNKNOWN_ERROR, "error_message": "出现错误,请联系后端开发人员!错误原因:{}".format(e)
}
return jsonify(error_msg)