# coding=gb2312 import json import os import random import traceback from threading import Thread import ping3 from UWB.data_node_via_tcp import DataNode, send_2, get_machine_ip_address from Speaker.speak_base import beep from UWB.uwb_driver import * from UWB.across_detect import AcrossDetect from UWB.algorithm import from_dist_get_coord from UWB.consensus import * from UWB.position_fixer import PositionFixer POSITION_SERVER_PORT = 7878 TYPE_TYPE = "type" TYPE_DATA = "data" TYPE_ANCHOR = "anchor" TYPE_TIME = "time" TYPE_VALID = "valid" TYPE_RESULT = "result" TYPE_START = "start" TYPE_END = "end" TYPE_TARGET = "target" TYPE_DISTANCE = "distance" TYPE_SUCCESS_TIME = "success_time" TYPE_RECORD_TIME = "record_time" CMD_SET_MODE = "set_mode" CMD_SET_MODE_RESULT = "set_mode_result" CMD_REPORT = "report" CMD_CLEAR_DATA = "clear_data" CMD_CLEAR_DATA_RESULT = "clear_data_result" CMD_REPORT_DATA = "report_data" CMD_REPORT_RESULT = "report_result" CMD_GET_INFO = "get_info" CMD_INFO = "info" CMD_TWR_REQUEST = "twr_request" CMD_TWR_RESULT = "twr_result" this_dir = os.path.dirname(os.path.realpath(__file__)) UWB_ASSIGNMENT_FILE_PATH = os.path.join(this_dir, "uwb_id_assignment.json") class Positioning(threading.Thread): def __init__(self) -> None: super().__init__() self.anchor_coordinates = None self.target_ip = None self.self_ip = None self.other_ip = None self.__running = threading.Event() self.__running.set() self.__detection = threading.Event() self.__detection.clear() self.__resume = threading.Event() self.__resume.clear() self.data_node = DataNode("0.0.0.0", port=POSITION_SERVER_PORT) self.data_node.start() # uwb驱动 self.uwb = UwbController() self.uwb.start() retry_time = 30 for i in range(retry_time + 1): if not self.uwb.set_anchor_status(status=MODE_POSITION) and i == retry_time: continue else: break # 检查基站分配表是否存在 if os.path.exists(UWB_ASSIGNMENT_FILE_PATH): with open(UWB_ASSIGNMENT_FILE_PATH, "r") as file: uwb_assignment = json.load(file) else: uwb_assignment = [i for i in range(8)] if uwb_assignment: seq = uwb_assignment.pop(0) else: seq = random.randint(0, 255) self.uwb.set_anchor_config(anchor_seq=seq, anchor_no=seq) self.anchor = self.uwb.anchor_id # 其他uwb设备信息 self.all_uwb_mes = {} self.uwb_coordinates = {} # 结果汇总 self.result_cache = {} # 时间同步 self.time_offset = 0 # 处理后的数据 self.tag_information = [] # 手环状态记录 self.tag_status = {} # 基站坐标 self.anchor_coordinates = {} # 模式 self.mode = None # 位置修正器 self.position_fixer = None # 冲线检测器 self.across_detect = AcrossDetect() # 未发送数据计数 self.send_fail_counting = 0 # 发送线程数 self.send_semaphore = threading.Semaphore(64) self.clear_cache_sign = threading.Event() self.clear_cache_sign.clear() # 设置开始记录时间 self.valid_time = 0 self.bias = 0 # 数据接受信号 self.__get_report_data_signal = threading.Event() self.__get_report_data_signal.clear() def set_valid_time(self, t): self.valid_time = t self.bias = 3 def start(self) -> None: super().start() Thread(target=self._thread_data_processing, daemon=True).start() Thread(target=self._thread_data_detect, daemon=True).start() # 获取手环状态 def get_tag_status(self, tag): data = self.tag_status.get(tag) if data: return data else: return {HR: 0, BO: 0} # 获取回报数据 def get_report_data(self, timeout=1): self.__get_report_data_signal.wait(timeout=timeout) data = self.get_cache(CMD_REPORT_DATA) if not data: self.__get_report_data_signal.clear() return data # 数据解析服务 def _thread_data_processing(self): while self.__running.is_set(): self.__resume.wait() data = self.get_report_data() if data: if data[DISTANCE] > 1500: continue if data[RECORD] < self.valid_time - self.bias: continue tag = data[TAG_ID] record_time = data[RECORD] heart_rate = data[HR] blood_oxygen = data[BO] self.tag_status[tag] = {HR: heart_rate, BO: blood_oxygen} distance = { data[ANCHOR_ID]: data[DISTANCE] } tag_package = { TAG: tag, DIST_MES: distance, HEART_RATE: heart_rate, BLOOD_OXYGEN: blood_oxygen, RECORD: record_time } across = False merge_position_data = None if self.mode == MODE_POSITIONING and self.position_fixer: self.position_fixer: PositionFixer self.position_fixer.get_position(tag_package) merge_position_data = self.position_fixer.get_record(tag) across = self.across_detect.position_detect(merge_position_data) tag_package[ACROSS] = across if across: if merge_position_data: if len(merge_position_data) >= 4: tag_package[RELIABILITY] = 3 elif len(merge_position_data) == 3 or len(merge_position_data) == 1: tag_package[RELIABILITY] = 2 else: tag_package[RELIABILITY] = 1 tag_package[DIST] = min(merge_position_data.values()) else: tag_package[RELIABILITY] = 0 tag_package[DIST] = float('inf') else: tag_package[RELIABILITY] = 0 tag_package[DIST] = float('inf') self.tag_information.append( (record_time, tag_package) ) time.sleep(0.001) def clear_information(self): self.mode = MODE_POSITIONING self.uwb.clear_all() self.tag_information.clear() retry_time = 9 for i in range(10): if not self.start_positioning() and i == retry_time: return False else: break def get_last_information(self): if self.tag_information: return self.tag_information.pop(-1) def get_information(self): if self.tag_information: return self.tag_information.pop(0) def stop(self): self.__running.clear() self.__resume.set() self.__detection.set() def pause(self): self.__resume.clear() self.__detection.clear() def resume(self): self.__resume.set() self.__detection.set() def get_anchor_coordinates(self): return self.anchor_coordinates @staticmethod # 基站定位 def anchor_positioning(anc_info, twr): # 获取基站列表 anc_list = list(anc_info.keys()) # 生成对照表 dist_table = {anchor: {} for anchor in anc_list} for dist_mes in twr: start = dist_mes[TYPE_START] stop = dist_mes[TYPE_END] dis = dist_mes[TYPE_DISTANCE] dist_table[start][stop] = dis dist_table[stop][start] = dis # 生成距离矩阵 dist_matrix = [ [ 0 if start == end else dist_table[start][end] for end in anc_list ] for start in anc_list ] # 生成基站坐标 coordinates = from_dist_get_coord(dist_matrix) min_x = 0 min_y = 0 for coord in coordinates: if coord[0] < min_x: min_x = coord[0] if coord[1] < min_y: min_y = coord[1] anchor_coordinates = { anc_list[i]: [ (coordinates[i][0] - min_x), (coordinates[i][1] - min_y) ] for i in range(len(anc_list)) } return anchor_coordinates # 设置为定位模式 def set_2_positioning_mode(self, timeout=5, ip_list=None): self.anchor_coordinates.clear() if not ip_list: ip_list = get_machine_ip_address() print("Find IP", ip_list) self.set_ip(other_ip=ip_list[1::], self_ip=ip_list[0]) # 获得基站列表 anc_info = self.get_all_anchor() print("Find Anchor", anc_info) anchor_coordinates = { anchor: [0, 0] for anchor in anc_info.keys() } self.anchor_coordinates = anchor_coordinates print("Success! Anchor Coordinates", self.anchor_coordinates) # 设置位置修正器 self.position_fixer = PositionFixer(self.anchor_coordinates) self.across_detect = AcrossDetect() self.across_detect.set_anchor_coordinates(self.anchor_coordinates) print("Set to Position Mode!") retry_time = 9 for i in range(10): if not self.start_positioning() and i == retry_time: return False else: break self.mode = MODE_POSITIONING print("Position Mode Initial Successfully!") return True def stop_positioning_mode(self): if self.mode == MODE_POSITIONING: retry_time = 3 for i in range(5): if not self.stop_positioning() and i == retry_time: return False else: break self.mode = None return True def set_tag_beep(self, tag, beep): self.uwb.set_tag(tag_id=tag, alarm_status=beep) def get_tag_mes(self, tag): tag_mes = self.uwb.get_data() if tag_mes and tag_mes[TAG_ID] == tag: return tag_mes def get_data(self): tag_mes = self.uwb.get_data() return tag_mes # 获取当前感知手环 def get_detect_tag(self, timeout=5, edge=DEFAULT_DISTANCE): start_time = time.time() if self.mode == MODE_POSITIONING: self.__detection.clear() self.uwb.clear_all() self.uwb.clear_cache(TYPE_DATA) while time.time() - start_time < timeout: data = self.uwb.get_data() if data: try: tag = data[TAG_ID] distance = data[DISTANCE] if distance < edge: if self.mode == MODE_POSITIONING: self.__detection.set() return tag except: continue if self.mode == MODE_POSITIONING: self.__detection.set() def get_all_band(self, detect_time=5): start_time = time.time() result = [] if self.mode == MODE_POSITIONING: self.__detection.clear() while time.time() - start_time < detect_time: data = self.uwb.get_data() if data: result.append(data[TAG_ID]) if self.mode == MODE_POSITIONING: self.__detection.set() if len(result) == 0: retry_time = 3 for i in range(5): if not self.uwb.set_anchor_status(status=MODE_POSITION) and i == retry_time: return None else: break return list(set(result)) def set_report_target(self, target_ip): self.target_ip = target_ip # 数据监听线程 def _thread_data_detect(self): while self.__running.is_set(): self.__detection.wait() # print('detect') data = self.uwb.get_data() if data: data.update({TYPE_RECORD_TIME: data[RECORD] + self.time_offset}) self.send_cmd(_type=CMD_REPORT_DATA, data=data, target_ip=self.target_ip) def get_cache(self, _type): if _type not in self.result_cache.keys(): return None else: if self.result_cache[_type]: return self.result_cache[_type].pop(0) else: return None def clear_cache(self, _type): if _type in self.result_cache.keys(): if self.result_cache[_type]: self.result_cache[_type].clear() def send_cmd(self, _type, data, target_ip, self_ip=None, block=False, timeout=30): pkg = { TYPE_TYPE: _type, TYPE_DATA: data, TYPE_TARGET: self_ip } def must_send_2(): start_time = time.time() while time.time() - start_time < timeout: if not send_2(target_ip, pkg, POSITION_SERVER_PORT): time.sleep(0.1) else: break self.send_semaphore.release() if block: start_time = time.time() while time.time() - start_time < timeout: if not send_2(target_ip, pkg, POSITION_SERVER_PORT): time.sleep(0.1) else: return True else: while not self.send_semaphore.acquire(timeout=0.01): if self.clear_cache_sign.is_set(): return True threading.Thread(target=must_send_2, daemon=True).start() return False # 设置设备ip地址 def set_ip(self, self_ip, other_ip): self.self_ip = self_ip self.other_ip = other_ip # 获得所有基站的信息 def get_all_anchor(self, timeout=7): self.all_uwb_mes = {self.uwb.anchor_id: self.self_ip} self.data_node.clear_data() for ip in self.other_ip: delay = ping3.ping(ip, timeout=5) if delay is not None: fix_time = time.time() + delay print(f"与设备{ip}链接延迟为{delay}s") send_successful = self.send_cmd( _type=CMD_GET_INFO, data=[ip, fix_time], target_ip=ip, self_ip=self.self_ip, block=True, timeout=timeout ) if send_successful: beep(duration=500, freq=3000) else: print(f"无法与设备{ip}建立链接") start_time = time.time() while time.time() - start_time < timeout: response = self.get_cache(CMD_INFO) if response: print(response) anchor, ip = response self.all_uwb_mes[anchor] = ip if len(self.all_uwb_mes) == len(self.other_ip) + 1: print("Done!") break beep(duration=1000, freq=500) return self.all_uwb_mes # 设置所有基站的模式 def set_all_anchor_mode(self, mode, timeout=10): self.clear_cache(CMD_SET_MODE_RESULT) for ip in list(set(list(self.all_uwb_mes.values()))): self.send_cmd( _type=CMD_SET_MODE, data=mode, target_ip=ip, self_ip=self.self_ip ) start_time = time.time() counting = 0 max_counting_num = len(self.all_uwb_mes) while time.time() - start_time < timeout: response = self.get_cache(CMD_SET_MODE_RESULT) if response is not None: counting += 1 if response is False: return False if counting == max_counting_num: return True return False # 获得所有基站相互测距结果 def twr(self, timeout=15): twr_result = [] all_anchor = list(self.all_uwb_mes.keys()) for i in range(len(all_anchor)): for j in range(i + 1, len(all_anchor)): target_ip = self.all_uwb_mes[all_anchor[i]] data = { TYPE_ANCHOR: all_anchor[j], TYPE_TIME: time.time() } self.send_cmd(_type=CMD_TWR_REQUEST, data=data, target_ip=target_ip, self_ip=self.self_ip) start_time = time.time() while time.time() - start_time < timeout: response = self.get_cache(CMD_TWR_RESULT) if response: if response[TYPE_VALID]: try: one_twr = { TYPE_START: all_anchor[i], TYPE_END: all_anchor[j], TYPE_DISTANCE: response[TYPE_RESULT][TYPE_DISTANCE] / 100, TYPE_SUCCESS_TIME: response[TYPE_RESULT][TYPE_SUCCESS_TIME] } twr_result.append(one_twr) print( f"Twr Success! {all_anchor[i]} --> {all_anchor[j]} | " f"Dis:{response[TYPE_RESULT][TYPE_DISTANCE]} | " f"SuccessTimes:{response[TYPE_RESULT][TYPE_SUCCESS_TIME]}" ) except TypeError: print("Twr Error:", response) return twr_result else: print(f"Twr Fail! {all_anchor[i]} --> {all_anchor[j]}") return twr_result break return twr_result def start_positioning(self, timeout=5): self.clear_cache(CMD_REPORT_RESULT) anchor_seq = 1 for ip in list(set(list(self.all_uwb_mes.values()))): self.send_cmd( _type=CMD_REPORT, data=[True, anchor_seq], target_ip=ip, self_ip=self.self_ip, block=True ) anchor_seq += 1 start_time = time.time() counting = 0 max_counting_num = len(self.all_uwb_mes) while time.time() - start_time < timeout: response = self.get_cache(CMD_REPORT_RESULT) if response is not None: counting += 1 if response is False: return False if counting == max_counting_num: return True return False def clear_position_data(self, timeout=5): self.clear_cache(CMD_REPORT_RESULT) for ip in list(set(list(self.all_uwb_mes.values()))): self.send_cmd( _type=CMD_CLEAR_DATA, data=None, target_ip=ip, self_ip=self.self_ip, block=True ) start_time = time.time() counting = 0 max_counting_num = len(self.all_uwb_mes) while time.time() - start_time < timeout: response = self.get_cache(CMD_CLEAR_DATA_RESULT) if response is not None: counting += 1 if response is False: return False if counting == max_counting_num: return True def stop_positioning(self, timeout=5): self.clear_cache(CMD_REPORT_RESULT) for ip in list(set(list(self.all_uwb_mes.values()))): self.send_cmd( _type=CMD_REPORT, data=[False], target_ip=ip, self_ip=self.self_ip, block=True ) start_time = time.time() counting = 0 max_counting_num = len(self.all_uwb_mes) while time.time() - start_time < timeout: response = self.get_cache(CMD_REPORT_RESULT) if response is not None: counting += 1 if response is False: return False if counting == max_counting_num: return True return False def run(self) -> None: while self.__running.is_set(): self.__resume.wait() data = self.data_node.get_data() if data: try: cmd_type = data[TYPE_TYPE] cmd_data = data[TYPE_DATA] target_ip = data[TYPE_TARGET] if cmd_type == CMD_SET_MODE: mode = cmd_data result = self.uwb.set_anchor_status(status=mode) self.send_cmd(_type=CMD_SET_MODE_RESULT, data=result, target_ip=target_ip) elif cmd_type == CMD_GET_INFO: myip = cmd_data[0] fix_time = cmd_data[1] self.time_offset = fix_time - time.time() print("cmd get message", self.uwb.anchor_id, myip, fix_time) success = False for i in range(3): result = [self.uwb.anchor_id, myip] if self.send_cmd(_type=CMD_INFO, data=result, target_ip=target_ip, block=True, timeout=5): beep(duration=300, freq=500) time.sleep(0.2) beep(duration=300, freq=500) success = True break if success: continue beep(duration=1000, freq=5000) elif cmd_type == CMD_CLEAR_DATA: # self.uwb.clear_all() self.uwb.clear_cache(TYPE_DATA) self.clear_information() self.send_cmd(_type=CMD_CLEAR_DATA_RESULT, data=None, target_ip=target_ip) elif cmd_type == CMD_REPORT: if cmd_data[0]: self.clear_cache_sign.set() self.uwb.clear_all() result = self.uwb.set_anchor_status(status=MODE_POSITION) anchor_seq = cmd_data[1] if result: result = self.uwb.set_anchor_config(anchor_seq=anchor_seq, anchor_no=anchor_seq) self.set_report_target(target_ip=target_ip) self.__detection.set() self.clear_cache_sign.clear() else: result = True self.__detection.clear() self.uwb.clear_all() self.send_cmd(_type=CMD_REPORT_RESULT, data=result, target_ip=target_ip) elif cmd_type == CMD_TWR_REQUEST: target_anchor = cmd_data[TYPE_ANCHOR] master_time = cmd_data[TYPE_TIME] self.time_offset = time.time() - master_time valid = self.uwb.twr(target_anchor) if valid: result = self.uwb.get_twr_result() else: result = None response = {TYPE_VALID: valid, TYPE_RESULT: result} self.send_cmd(_type=CMD_TWR_RESULT, data=response, target_ip=target_ip) beep(duration=1000, freq=500) elif cmd_type == CMD_REPORT_DATA: self.result_cache.setdefault(cmd_type, []) self.result_cache[cmd_type].append(cmd_data) self.__get_report_data_signal.set() else: self.result_cache.setdefault(cmd_type, []) self.result_cache[cmd_type].append(cmd_data) except Exception as e: print(f"Error:{e.args}") print(traceback.format_exc()) else: time.sleep(0.1) def close_tag(self, tag_id): self.uwb.close_tag(tag_id) def set_tag(self, tag_id, freq=None, alarm_status=None, detection_config=None): self.uwb.set_tag( tag_id, freq=freq, alarm_status=alarm_status, detection_config=detection_config )