LISHUZUOXUN_yangjiang/UWB/positioning_standalone_v2.py

250 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=gb2312
from threading import Thread
from UWB.consensus import *
from UWB.multi_uwb import MultiUWB
from UWB.uwb_driver import *
TYPE_TYPE = "type"
TYPE_DATA = "data"
TYPE_ANCHOR = "anchor"
TYPE_TIME = "time"
TYPE_VALID = "valid"
TYPE_RESULT = "result"
TYPE_START = "start"
TYPE_END = "end"
TYPE_TARGET = "target"
TYPE_DISTANCE = "distance"
TYPE_SUCCESS_TIME = "success_time"
TYPE_RECORD_TIME = "record_time"
CMD_SET_MODE = "set_mode"
CMD_SET_MODE_RESULT = "set_mode_result"
CMD_REPORT = "report"
CMD_CLEAR_DATA = "clear_data"
CMD_CLEAR_DATA_RESULT = "clear_data_result"
CMD_REPORT_DATA = "report_data"
CMD_REPORT_RESULT = "report_result"
CMD_GET_INFO = "get_info"
CMD_INFO = "info"
CMD_TWR_REQUEST = "twr_request"
CMD_TWR_RESULT = "twr_result"
class Positioning(threading.Thread):
def __init__(self) -> None:
super().__init__()
self.__running = threading.Event()
self.__running.set()
self.__detection = threading.Event()
self.__detection.clear()
self.__resume = threading.Event()
self.__resume.set()
# 多uwb驱动
self.multi_uwb = MultiUWB()
# 处理后的数据
self.tag_information = []
# 手环状态记录
self.tag_status = {}
# 手环平均记录
self.tag_merge_record = {}
# 模式
self.mode = None
# 设置开始记录时间
self.valid_time = 0
self.bias = 0
def set_valid_time(self, t):
self.valid_time = t
self.bias = 3
def start(self) -> None:
super().start()
self.multi_uwb.start()
for i in range(3):
if self.multi_uwb.start_all_record():
break
print(f"定位服务启动!共发现{len(self.multi_uwb.uwb_driver_table)}个基站!")
Thread(target=self._thread_data_processing, daemon=True).start()
# 获取手环状态
def get_tag_status(self, tag):
data = self.tag_status.get(tag)
if data:
return data
else:
return {HR: 0, BO: 0}
def get_anchor_coordinates(self):
anchor_coordinates = {
anchor: [0, 0]
for anchor in self.multi_uwb.uwb_driver_table.keys()
}
return anchor_coordinates
# 数据解析服务
def _thread_data_processing(self):
while self.__running.is_set():
self.__resume.wait()
# time.sleep(1000)
data = self.multi_uwb.get_data()
if data:
# print(data)
# 数据过滤策略1过滤掉无效的数据
if data[DISTANCE] > FILTER_DISTANCE or data[DISTANCE] < 1:
continue
# 数据过滤策略2过滤有效时间前的数据
if data[RECORD] < self.valid_time - self.bias:
continue
# 数据过滤策略3保留信号强度强于-85的数据
if data[TOTAL_RSSI] < FILTER_RSSI:
continue
tag = data[TAG_ID]
record_time = data[RECORD]
heart_rate = data[HR]
blood_oxygen = data[BO]
self.tag_status[tag] = {HR: heart_rate, BO: blood_oxygen}
anchor = data[ANCHOR_ID]
self.tag_merge_record.setdefault(tag, [])
self.tag_merge_record[tag].append({
LAST_RECORD_TIME: record_time,
DIST: data[DISTANCE],
ANCHOR_ID: data[ANCHOR_ID],
RSSI: data[TOTAL_RSSI]
})
# 筛选时间窗内的数据
self.tag_merge_record[tag] = list(
filter(
lambda x: record_time - x[LAST_RECORD_TIME] < FORGOT_TIME,
self.tag_merge_record[tag]
)
)
rssi = data[TOTAL_RSSI]
dist_mes = self.tag_merge_record[tag]
if data[IS_SUCCESSFUL]:
distance = sum([
data[DIST]
for data in self.tag_merge_record[tag]
]) / len(self.tag_merge_record[tag])
else:
distance = None
across = True if distance and distance <= ACROSS_DISTANCE or rssi >= ACROSS_RSSI else False
tag_package = {
TAG: tag,
ANCHOR_ID: anchor,
ACROSS: across,
RELIABILITY: len(self.tag_merge_record[tag]),
DIST: distance,
RSSI: rssi,
HEART_RATE: heart_rate,
BLOOD_OXYGEN: blood_oxygen,
RECORD: record_time,
DIST_MES: dist_mes,
IS_SUCCESSFUL: data[IS_SUCCESSFUL]
}
self.tag_information.append(
(record_time, tag_package)
)
def clear_information(self):
self.multi_uwb.clear_all_record()
self.tag_information.clear()
def get_last_information(self):
if self.tag_information:
return self.tag_information.pop(-1)
def get_information(self):
if self.tag_information:
return self.tag_information.pop(0)
def stop(self):
self.__running.clear()
self.__resume.set()
self.__detection.set()
self.multi_uwb.stop_all_record()
self.multi_uwb.stop()
def pause(self):
self.__resume.clear()
def resume(self):
self.__resume.set()
def set_tag_beep(self, tag, beep):
self.multi_uwb.set_tag(tag_id=tag, alarm_status=beep)
def get_tag_mes(self, tag, timeout=0.1):
tag_mes = self.multi_uwb.get_data(timeout=timeout)
if tag_mes and tag_mes[TAG_ID] == tag:
return tag_mes
def get_data(self):
tag_mes = self.multi_uwb.get_data()
return tag_mes
# 获取当前感知手环
def get_detect_tag(self, timeout=5, edge=DEFAULT_DISTANCE):
if self.mode != MODE_POSITION:
self.multi_uwb.start_all_record()
self.pause()
self.multi_uwb.clear_all_record()
start_time = time.time()
while time.time() - start_time < timeout:
data = self.multi_uwb.get_data()
if data:
try:
tag = data[TAG_ID]
distance = data[DISTANCE]
if distance < edge:
self.resume()
return tag
except KeyError:
continue
self.resume()
def get_all_band(self, detect_time=5):
self.pause()
if self.mode != MODE_POSITION:
self.multi_uwb.start_all_record()
time.sleep(detect_time)
result = [data[TAG_ID] for data in self.multi_uwb.get_all_data()]
self.resume()
return list(set(result))
def start_positioning(self, timeout=5):
start_time = time.time()
while time.time() - start_time < timeout:
if self.multi_uwb.start_all_record():
self.mode = MODE_POSITIONING
return True
return False
def clear_position_data(self):
self.multi_uwb.clear_all_record()
def stop_positioning(self, timeout=5):
start_time = time.time()
while time.time() - start_time < timeout:
if self.multi_uwb.stop_all_record():
self.mode = MODE_FREE
return True
return False
def close_tag(self, tag_id):
self.multi_uwb.close_tag(tag_id)
def set_tag(self, tag_id, freq=None, alarm_status=None, detection_config=None):
self.multi_uwb.set_tag(
tag_id, freq=freq, alarm_status=alarm_status,
detection_config=detection_config
)