LISHUZUOXUN_yangjiang/UWB/positioning_standalone.py

257 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=gb2312
import time
from threading import Thread
from UWB.uwb_driver import *
from UWB.consensus import *
from UWB.multi_uwb import MultiUWB
TYPE_TYPE = "type"
TYPE_DATA = "data"
TYPE_ANCHOR = "anchor"
TYPE_TIME = "time"
TYPE_VALID = "valid"
TYPE_RESULT = "result"
TYPE_START = "start"
TYPE_END = "end"
TYPE_TARGET = "target"
TYPE_DISTANCE = "distance"
TYPE_SUCCESS_TIME = "success_time"
TYPE_RECORD_TIME = "record_time"
CMD_SET_MODE = "set_mode"
CMD_SET_MODE_RESULT = "set_mode_result"
CMD_REPORT = "report"
CMD_CLEAR_DATA = "clear_data"
CMD_CLEAR_DATA_RESULT = "clear_data_result"
CMD_REPORT_DATA = "report_data"
CMD_REPORT_RESULT = "report_result"
CMD_GET_INFO = "get_info"
CMD_INFO = "info"
CMD_TWR_REQUEST = "twr_request"
CMD_TWR_RESULT = "twr_result"
class Positioning(threading.Thread):
def __init__(self) -> None:
super().__init__()
self.__running = threading.Event()
self.__running.set()
self.__detection = threading.Event()
self.__detection.clear()
self.__resume = threading.Event()
self.__resume.set()
# 多uwb驱动
self.multi_uwb = MultiUWB()
# 处理后的数据
self.tag_information = []
# 手环状态记录
self.tag_status = {}
# 手环平均记录
self.tag_merge_record = {}
# 模式
self.mode = None
# 设置开始记录时间
self.valid_time = 0
self.bias = 0
def set_valid_time(self, t):
self.valid_time = t
self.bias = 3
def start(self) -> None:
super().start()
self.multi_uwb.start()
for i in range(3):
if self.multi_uwb.start_all_record():
break
print(f"定位服务启动!共发现{len(self.multi_uwb.uwb_driver_table)}个基站!")
Thread(target=self._thread_data_processing, daemon=True).start()
# 获取手环状态
def get_tag_status(self, tag):
data = self.tag_status.get(tag)
if data:
return data
else:
return {HR: 0, BO: 0}
def get_anchor_coordinates(self):
anchor_coordinates = {
anchor: [0, 0]
for anchor in self.multi_uwb.uwb_driver_table.keys()
}
return anchor_coordinates
# 数据解析服务
def _thread_data_processing(self):
while self.__running.is_set():
self.__resume.wait()
# time.sleep(1000)
data = self.multi_uwb.get_data()
if data:
# print(data)
# 数据过滤策略1过滤掉无效的数据
if data[DISTANCE] > FILTER_DISTANCE or data[DISTANCE] < 1:
continue
if data[RECORD] < self.valid_time - self.bias:
continue
# 数据过滤策略2当测距成功时仅保留距离数据
if data[IS_SUCCESSFUL]:
pass
# if 500 <= data[DISTANCE] <= 700:
# data[TOTAL_RSSI] = -80
# elif data[DISTANCE] < 500:
# data[TOTAL_RSSI] = -75
# 数据过滤策略3测距失败时若信号数据不合理删除该条信号数据
else:
if data[TOTAL_RSSI] < FILTER_RSSI or data[TOTAL_RSSI] > -60:
continue
# 现象:测距失败时,当信号强度较强时(约-80其距离为200左右但实际距离为1500以上
# 背景:正常情况下,信号强度为-80左右时其距离范围很广
# 处置:当测距失败,将信号强度在-80~-84范围内的数据直接清除
if -84 <= data[TOTAL_RSSI] <= -80:
continue
# print(data)
tag = data[TAG_ID]
record_time = data[RECORD]
heart_rate = data[HR]
blood_oxygen = data[BO]
self.tag_status[tag] = {HR: heart_rate, BO: blood_oxygen}
self.tag_merge_record.setdefault(tag, [])
self.tag_merge_record[tag].append({
LAST_RECORD_TIME: record_time,
DIST: data[DISTANCE],
ANCHOR_ID: data[ANCHOR_ID],
RSSI: data[TOTAL_RSSI]
})
# 筛选时间窗内的数据
self.tag_merge_record[tag] = list(
filter(
lambda x: record_time - x[LAST_RECORD_TIME] < FORGOT_TIME,
self.tag_merge_record[tag]
)
)
dist_mes = self.tag_merge_record[tag]
avg_distance = sum([
data[DIST]
for data in self.tag_merge_record[tag]
]) / len(self.tag_merge_record[tag])
avg_rssi = sum([
data[RSSI]
for data in self.tag_merge_record[tag]
]) / len(self.tag_merge_record[tag])
across = True if avg_distance <= ACROSS_DISTANCE or avg_rssi >= ACROSS_RSSI else False
tag_package = {
TAG: tag,
ACROSS: across,
RELIABILITY: len(self.tag_merge_record[tag]),
DIST: avg_distance,
RSSI: avg_rssi,
HEART_RATE: heart_rate,
BLOOD_OXYGEN: blood_oxygen,
RECORD: record_time,
DIST_MES: dist_mes,
}
self.tag_information.append(
(record_time, tag_package)
)
def clear_information(self):
self.multi_uwb.clear_all_record()
self.tag_information.clear()
def get_last_information(self):
if self.tag_information:
return self.tag_information.pop(-1)
def get_information(self):
if self.tag_information:
return self.tag_information.pop(0)
def stop(self):
self.__running.clear()
self.__resume.set()
self.__detection.set()
self.multi_uwb.stop_all_record()
self.multi_uwb.stop()
def pause(self):
self.__resume.clear()
def resume(self):
self.__resume.set()
def set_tag_beep(self, tag, beep):
self.multi_uwb.set_tag(tag_id=tag, alarm_status=beep)
def get_tag_mes(self, tag):
tag_mes = self.multi_uwb.get_data()
if tag_mes and tag_mes[TAG_ID] == tag:
return tag_mes
# 获取当前感知手环
def get_detect_tag(self, timeout=5, edge=DEFAULT_DISTANCE):
if self.mode != MODE_POSITION:
self.multi_uwb.start_all_record()
self.pause()
self.multi_uwb.clear_all_record()
start_time = time.time()
while time.time() - start_time < timeout:
data = self.multi_uwb.get_data()
if data:
try:
tag = data[TAG_ID]
distance = data[DISTANCE]
if distance < edge:
self.resume()
return tag
except KeyError:
continue
self.resume()
def get_all_band(self, detect_time=5):
self.pause()
if self.mode != MODE_POSITION:
self.multi_uwb.start_all_record()
time.sleep(detect_time)
result = [data[TAG_ID] for data in self.multi_uwb.get_all_data()]
self.resume()
return list(set(result))
def start_positioning(self, timeout=5):
start_time = time.time()
while time.time() - start_time < timeout:
if self.multi_uwb.start_all_record():
self.mode = MODE_POSITIONING
return True
return False
def clear_position_data(self):
self.multi_uwb.clear_all_record()
def stop_positioning(self, timeout=5):
start_time = time.time()
while time.time() - start_time < timeout:
if self.multi_uwb.stop_all_record():
self.mode = MODE_FREE
return True
return False
def close_tag(self, tag_id):
self.multi_uwb.close_tag(tag_id)
def set_tag(self, tag_id, freq=None, alarm_status=None, detection_config=None):
self.multi_uwb.set_tag(
tag_id, freq=freq, alarm_status=alarm_status,
detection_config=detection_config
)