LISHUZUOXUN_yangjiang/UWB/uwb_driver.py

788 lines
28 KiB
Python
Raw Normal View History

2024-09-23 14:54:15 +08:00
# coding=gb2312
import binascii
import re
import threading
import time
import serial
import serial.tools.list_ports
from LogRecord.log_recorder import GLOBAL_LOG
def crc16(data):
# <20><>ʼ<EFBFBD><CABC>crcΪ0xFFFF
crc = 0xFFFF
# ѭ<><D1AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ÿ<EFBFBD><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֽ<EFBFBD>
for byte in data:
# <20><>ÿ<EFBFBD><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֽ<EFBFBD><D6BD><EFBFBD>crc<72><63><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
crc ^= byte
# <20><>crc<72><63>ÿһλ<D2BB><CEBB><EFBFBD>д<EFBFBD><D0B4><EFBFBD>
for _ in range(8):
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>λΪ1<CEAA><31><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һλ<D2BB><CEBB>ִ<EFBFBD><D6B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD>0xA001<30><31><EFBFBD><EFBFBD>(<28><>0x8005<30><35>λ<EFBFBD>ߵ<EFBFBD><DFB5><EFBFBD><EFBFBD>Ľ<EFBFBD><C4BD><EFBFBD>)
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>λΪ0<CEAA><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>crc<72><63><EFBFBD><EFBFBD>һλ
else:
crc = crc >> 1
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>յ<EFBFBD>crcֵ
return crc.to_bytes(2, byteorder='little', signed=False)
# <20><><EFBFBD><EFBFBD>uwbģ<62><EFBFBD><E9B7BD>
def detect_uwb(ser):
cmd = 'EB01FF4024'
tx_str = bytes(bytearray.fromhex(cmd))
ser.write(tx_str)
time.sleep(0.5)
hex_str = binascii.b2a_hex(ser.read_all()).decode().upper().strip()
if cmd in hex_str:
return True
else:
return False
# <20><>ǩ״̬
MODE_NORMAL = b"\xab"
MODE_ALARM = b"\xee"
# <20><>ǩ<EFBFBD><C7A9><EFBFBD><EFBFBD>״̬
MODE_UNWEAR = 0
MODE_WEAR = 1
MODE_ILLEGAL = 2
ANCHOR_ID = "anchor_id"
TAG_ID = "tag_id"
BATTERY = "battery"
STATUS = "status"
DISTANCE = "distance"
DYNAMIC_FREQ = "dynamic_freq"
INACTIVE_FREQ = "inactive_freq"
ALARM_FREQ = "alarm_freq"
HR = "hr"
BO = "bo"
DETECT = "detect"
HR_ALARM_VALUE_MAX = "hr_alarm_value_max"
BO_ALARM_VALUE_MIN = "bo_alarm_value_min"
HR_OPT_VALUE_MAX = "hr_opt_value_max"
HR_OPT_VALUE_MIN = "hr_opt_value_min"
BO_OPT_VALUE_MAX = "bo_opt_value_max"
BO_OPT_VALUE_MIN = "bo_opt_value_min"
ILLEGAL_ALARM_TIMES = "illegal_alarm_times"
RECORD = "record_time"
IS_SUCCESSFUL = "is_successful"
FIRST_RSSI = "first_rssi"
TOTAL_RSSI = "total_rssi"
def data_decode_func_1_5(data):
anchor_id_reverse = data[0: 8]
tag_id_reverse = data[8: 14]
tag_battery = data[14]
tag_status = data[15:16]
tag_is_successful = data[17]
tag_distance = data[18:20]
tag_first_rssi = data[20:22]
tag_total_rssi = data[22:24]
tag_dynamic_freq = data[25:27]
tag_inactive_freq = data[27:29]
tag_alarm_freq = data[29:21]
tag_hr = data[34]
tag_bo = data[35]
tag_detect = data[38]
hr_alarm_value_max = data[39]
bo_alarm_value_min = data[40]
hr_opt_value_max = data[41]
hr_opt_value_min = data[42]
bo_opt_value_max = data[43]
bo_opt_value_min = data[44]
illegal_alarm_times = data[45]
# <20><><EFBFBD>ݽ<EFBFBD><DDBD><EFBFBD>
anchor_id = anchor_id_reverse[::-1].hex()
tag_id = tag_id_reverse[::-1].hex()
return {
ANCHOR_ID: anchor_id,
TAG_ID: tag_id,
BATTERY: tag_battery,
STATUS: tag_status,
DISTANCE: int.from_bytes(tag_distance, byteorder="big", signed=False),
IS_SUCCESSFUL: tag_is_successful,
FIRST_RSSI: int.from_bytes(tag_first_rssi, byteorder="big", signed=True),
TOTAL_RSSI: int.from_bytes(tag_total_rssi, byteorder="big", signed=True),
DYNAMIC_FREQ: int.from_bytes(tag_dynamic_freq, byteorder="big", signed=False),
INACTIVE_FREQ: int.from_bytes(tag_inactive_freq, byteorder="big", signed=False),
ALARM_FREQ: int.from_bytes(tag_alarm_freq, byteorder="big", signed=False),
HR: tag_hr,
BO: tag_bo,
DETECT: tag_detect,
HR_ALARM_VALUE_MAX: hr_alarm_value_max,
BO_ALARM_VALUE_MIN: bo_alarm_value_min,
HR_OPT_VALUE_MAX: hr_opt_value_max,
HR_OPT_VALUE_MIN: hr_opt_value_min,
BO_OPT_VALUE_MAX: bo_opt_value_max,
BO_OPT_VALUE_MIN: bo_opt_value_min,
ILLEGAL_ALARM_TIMES: illegal_alarm_times,
RECORD: time.time()
}
def data_decode_func(data):
# Դ<><D4B4><EFBFBD>ݽ<EFBFBD><DDBD><EFBFBD>
anchor_id_reverse = data[0: 8]
tag_id_reverse = data[8: 14]
tag_battery = data[14]
tag_status = data[15:16]
tag_distance = data[17:19]
tag_dynamic_freq = data[24:26]
tag_inactive_freq = data[26:28]
tag_alarm_freq = data[28:30]
tag_hr = data[33]
tag_bo = data[34]
tag_detect = data[37]
hr_alarm_value_max = data[38]
bo_alarm_value_min = data[39]
hr_opt_value_max = data[40]
hr_opt_value_min = data[41]
bo_opt_value_max = data[42]
bo_opt_value_min = data[43]
illegal_alarm_times = data[44]
# <20><><EFBFBD>ݽ<EFBFBD><DDBD><EFBFBD>
anchor_id = anchor_id_reverse[::-1].hex()
tag_id = tag_id_reverse[::-1].hex()
return {
ANCHOR_ID: anchor_id,
TAG_ID: tag_id,
BATTERY: tag_battery,
STATUS: tag_status,
DISTANCE: int.from_bytes(tag_distance, byteorder="big", signed=False),
DYNAMIC_FREQ: int.from_bytes(tag_dynamic_freq, byteorder="big", signed=False),
INACTIVE_FREQ: int.from_bytes(tag_inactive_freq, byteorder="big", signed=False),
ALARM_FREQ: int.from_bytes(tag_alarm_freq, byteorder="big", signed=False),
HR: tag_hr,
BO: tag_bo,
DETECT: tag_detect,
HR_ALARM_VALUE_MAX: hr_alarm_value_max,
BO_ALARM_VALUE_MIN: bo_alarm_value_min,
HR_OPT_VALUE_MAX: hr_opt_value_max,
HR_OPT_VALUE_MIN: hr_opt_value_min,
BO_OPT_VALUE_MAX: bo_opt_value_max,
BO_OPT_VALUE_MIN: bo_opt_value_min,
ILLEGAL_ALARM_TIMES: illegal_alarm_times,
RECORD: time.time()
}
# <20><>վ״̬
MODE_FREE = 0
MODE_POSITION = 1
MODE_TWR = 2
# ָ<><D6B8><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
TYPE_DATA = b'\xa0'
TYPE_READ_STATUS = b'\xa3'
TYPE_WRITE_STATUS = b'\xc3'
TYPE_WRITE_CONFIG = b'\xc4'
TYPE_REBOOT = b'\xd0'
TYPE_TWR = b'\xc5'
TYPE_TWR_RESULT = b'\xa1'
TYPE_TEMP = b'\xa5'
TYPE_TAG_CONFIG = b'\xc7'
TYPE_ANCHOR_CONFIG = b'\xa2'
TYPE_SET_ANCHOR_CONFIG = b'\xc2'
class MySerial(serial.Serial):
def write(self, data):
# TODO:<3A><><EFBFBD><EFBFBD>
# GLOBAL_LOG.write("write:" + data.hex())
return super().write(data)
def read(self, size=1):
# TODO:<3A><><EFBFBD><EFBFBD>
data = super().read(size)
# GLOBAL_LOG.write("read:" + data.hex())
return data
class UwbController:
device_list = []
def __init__(self, uwb_timeout=0.1) -> None:
super().__init__()
"""************************************<2A>߳<EFBFBD><DFB3>źų<C5BA>ʼ<EFBFBD><CABC>************************************"""
self.__running = threading.Event()
self.__running.set()
self.__data_signal = threading.Event()
self.__data_signal.clear()
self.__resume = threading.Event()
self.__resume.set()
self.__get_new_position_data = threading.Event()
self.__get_new_position_data.clear()
"""************************************<2A><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><E8B1B8><EFBFBD><EFBFBD>************************************"""
self.anchor_id = None
self.anchor_version = None
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
self.ser = None
for port in list(serial.tools.list_ports.comports()):
self.ser = MySerial(timeout=uwb_timeout, baudrate=115200) # <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3>
if str(port.device) in UwbController.device_list:
continue
if 'CP210' in str(port.description):
self.ser.port = str(port.device)
try:
self.ser.open() # <20>򿪴<EFBFBD><F2BFAAB4><EFBFBD>
self.set_anchor_status_directly(MODE_FREE)
time.sleep(0.5)
# <20><><EFBFBD>Ի<EFBFBD>ȡUWBģ<42><C4A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
if detect_uwb(self.ser):
UwbController.device_list.append(str(port.device))
self.anchor_id, self.anchor_version = self.get_anchor_mes()
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>վIDΪ<EFBFBD><EFBFBD>{self.anchor_id}<EFBFBD><EFBFBD><EFBFBD><EFBFBD>վ<EFBFBD>汾Ϊ<EFBFBD><EFBFBD>{self.anchor_version}<EFBFBD><EFBFBD>")
break
except Exception:
if self.ser.isOpen():
self.ser.close()
"""************************************<2A>豸״̬<D7B4><CCAC><EFBFBD><EFBFBD>************************************"""
self.is_start = True
if self.anchor_id is None:
self.is_start = False
if not self.ser.isOpen():
self.is_start = False
"""************************************<2A><><EFBFBD>ݳ<EFBFBD>ʼ<EFBFBD><CABC>************************************"""
self.buffer = b""
self.data_cache = {}
self.decode_func = {
TYPE_DATA: data_decode_func_1_5
}
# ״̬<D7B4><CCAC><EFBFBD><EFBFBD>
def is_initialed(self):
return self.is_start
"""************************************<2A>߳̿<DFB3><CCBF><EFBFBD>ģ<EFBFBD><C4A3>************************************"""
def start(self):
threading.Thread(target=self._threading_data_recv, daemon=True).start()
threading.Thread(target=self._threading_data_decode, daemon=True).start()
def pause(self):
self.__resume.clear()
def resume(self):
self.__resume.set()
def stop(self):
self.__resume.set()
self.__data_signal.set()
self.__running.clear()
"""************************************<2A>ϲ<EFBFBD><CFB2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>************************************"""
def get_cache(self, _type):
if _type not in self.data_cache.keys():
return None
else:
try:
if self.data_cache.get(_type):
return self.data_cache[_type].pop(0)
else:
return None
except KeyError:
return None
def clear_cache(self, _type):
if _type in self.data_cache.keys():
if self.data_cache[_type]:
self.data_cache[_type].clear()
def clear_all(self):
self.data_cache = {}
self.buffer = b""
def _threading_data_recv(self):
while self.__running.is_set():
self.__resume.wait()
try:
bytes_data = self.ser.read(128)
# print(len(bytes_data))
if bytes_data:
self.buffer += bytes_data
self.__data_signal.set()
except:
time.sleep(0.001)
def _threading_data_decode(self):
while self.__running.is_set():
self.__resume.wait()
try:
self.__data_signal.wait()
self.__data_signal.clear()
while True:
eb_cmd_re = re.search(b"\xeb[\x00-\xff]{3,}", self.buffer[0:256])
if eb_cmd_re:
# <20><>ȡָ<C8A1><D6B8><EFBFBD><EFBFBD>ͷβ<CDB7><CEB2><EFBFBD><EFBFBD>
re_start = eb_cmd_re.start()
re_end = eb_cmd_re.end()
# <20><><EFBFBD>»<EFBFBD><C2BB><EFBFBD>
self.buffer = self.buffer[re_start::]
# <20><><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8><EFBFBD>ij<EFBFBD><C4B3><EFBFBD>
eb_len = self.buffer[1]
# <20>ж<EFBFBD><D0B6>Ƿ<EFBFBD><C7B7>Ѿ<EFBFBD><D1BE><EFBFBD><EFBFBD>ܵ<EFBFBD><DCB5><EFBFBD><E3B9BB><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8>+CRCУ<43><D0A3><EFBFBD><EFBFBD><EBB3A4>2
if eb_len + 2 <= len(self.buffer[2::]):
# <20><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
data = self.buffer[3: 3 + eb_len + 1]
# <20><>ȡָ<C8A1><D6B8><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
eb_type = self.buffer[2:3]
# crcУ<63><D0A3>
whole_data = self.buffer[0: eb_len + 2]
crc = crc16(whole_data)
crc_got = self.buffer[eb_len + 2: eb_len + 2 + 2]
# <20><><EFBFBD>»<EFBFBD><C2BB><EFBFBD>
self.buffer = self.buffer[1 + eb_len::]
if crc_got != crc:
# print("uwb<77><62><EFBFBD>ܵ<EFBFBD><DCB5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݰ<EFBFBD><DDB0><EFBFBD><EFBFBD>Ѷ<EFBFBD><D1B6><EFBFBD><EFBFBD><EFBFBD>")
continue
# <20><><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8>ָ<EFBFBD><EFBFBD><EEB7BD>
if eb_type in self.decode_func.keys():
decode_data = self.decode_func[eb_type](data)
else:
decode_data = data
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
self.data_cache.setdefault(eb_type, [])
self.data_cache[eb_type].append(decode_data)
if eb_type == TYPE_DATA:
self.__get_new_position_data.set()
else:
break
except Exception as e:
continue
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
# <20><><EFBFBD>û<EFBFBD>վ<EFBFBD>İ汾<C4B0><E6B1BE>Ϣ
def get_anchor_mes(self):
cmd = 'EB01C00034'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.ser.write(tx_str)
time.sleep(0.5)
bytes_data = self.ser.read_all()
info = re.search(rb"(?<=\xEB\x0E\xC0)[\x00-\xFF]{13}", bytes_data).group()
anchor_id_rev = [info[i:i + 1].hex() for i in range(8)]
anchor_id_rev.reverse()
anchor_id = "".join(anchor_id_rev)
anchor_version = "{}.{}.{}.{}".format(int(info[8]), int(info[9]), int(info[10]), int(info[11]))
return anchor_id, anchor_version
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return
# <20><>ȡ<EFBFBD><C8A1>վ״̬
def get_anchor_status(self, timeout=1):
cmd = 'EB01A3401D'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_READ_STATUS)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_READ_STATUS)
if bytes_data:
info = re.search(rb"[\x00-\xFF]{3}", bytes_data).group()
status = info[0]
return status
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return
# ֱ<><D6B1><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD>վ״̬
def set_anchor_status_directly(self, status):
status_bytes = status.to_bytes(1, byteorder='little', signed=False)
tx_str = b"\xEB\x02\xC3" + status_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.ser.write(tx_str)
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
# <20>ػ<EFBFBD>ģʽ<C4A3><CABD><EFBFBD>ر<EFBFBD><D8B1>ܱ<EFBFBD><DCB1><EFBFBD><EFBFBD>е<EFBFBD><D0B5>ֻ<EFBFBD>
def close_all_tag(self, ):
tx_str = b"\xEB\x02\xC3\x04\xC5\x33"
try:
self.ser.write(tx_str)
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
# <20>رչػ<D5B9>ģʽ<C4A3><CABD>ֹͣ<CDA3>ر<EFBFBD><D8B1>ܱ<EFBFBD><DCB1><EFBFBD><EFBFBD>е<EFBFBD><D0B5>ֻ<EFBFBD>
def stop_close_all_tag(self):
tx_str = b"\xEB\x02\xC3\x00\xC4\xF0"
try:
self.ser.write(tx_str)
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
# <20><><EFBFBD>û<EFBFBD>վ״̬
def set_anchor_status(self, status, timeout=1):
status_bytes = status.to_bytes(1, byteorder='little', signed=False)
tx_str = b"\xEB\x02\xC3" + status_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.clear_cache(TYPE_WRITE_STATUS)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_WRITE_STATUS)
if bytes_data:
info = re.search(status_bytes + crc, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20><><EFBFBD>û<EFBFBD>վ˳<D5BE><CBB3><EFBFBD>ͱ<EFBFBD><CDB1><EFBFBD>
def set_anchor_config(self, anchor_seq: int, anchor_no: int, timeout=1):
anchor_seq_bytes = anchor_seq.to_bytes(1, byteorder='little', signed=False)
anchor_no_bytes = anchor_no.to_bytes(2, byteorder='big', signed=False)
tx_str = b"\xEB\x04\xC4" + anchor_seq_bytes + anchor_no_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.clear_cache(TYPE_WRITE_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_WRITE_CONFIG)
if bytes_data:
info = re.search(anchor_seq_bytes + anchor_no_bytes + crc, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>վ
def reboot(self, timeout=5):
cmd = 'EB01D001F8'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_REBOOT)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_REBOOT)
if bytes_data:
info = re.search(b"\x01\xF8", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
def twr(self, target_anchor: str, timeout=10):
anchor_reverse = bytes(bytearray.fromhex(target_anchor))[::-1]
cmd = 'EB0AC501'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += anchor_reverse
crc = crc16(tx_str)
tx_str += crc
check_code = "010690"
check_code = bytes(bytearray.fromhex(check_code))
try:
self.clear_cache(TYPE_TWR)
self.clear_cache(TYPE_TWR_RESULT)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TWR)
if bytes_data:
info = re.search(check_code, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20><><EFBFBD>û<EFBFBD>վ<EFBFBD><D5BE><EFBFBD><EFBFBD>TWR<57><52><EFBFBD><EFBFBD>ֵ
def get_twr_result(self, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
try:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TWR_RESULT)
if bytes_data:
distance = int.from_bytes(bytes_data[0:2], byteorder="big", signed=False)
success_time = int.from_bytes(bytes_data[2:4], byteorder="big", signed=False)
return {
"distance": distance, "success_time": success_time
}
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
# <20><><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD><EFBFBD>
def get_temp(self, timeout=1):
cmd = 'EB01A5C01F'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_TEMP)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TEMP)
if bytes_data:
info = bytes_data[0:2]
temp = int.from_bytes(info, byteorder="big", signed=False) * 0.01
return temp
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
def get_data(self, timeout=0.1):
self.__get_new_position_data.wait(timeout=timeout)
data = self.get_cache(TYPE_DATA)
if not data:
self.__get_new_position_data.clear()
return data
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǩ
def clear_tag(self, tag_id, timeout=5):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
config_enable = b"\x00"
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# <20><><EFBFBD><EFBFBD>
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# ״̬<D7B4><CCAC><EFBFBD><EFBFBD>
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# <20><><EFBFBD><EFBFBD>
alarm_enable = b"\x02"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20>رձ<D8B1>ǩ
def close_tag(self, tag_id, timeout=5):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
config_enable = b"\x00"
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# <20><><EFBFBD><EFBFBD>
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# ״̬<D7B4><CCAC><EFBFBD><EFBFBD>
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# <20><><EFBFBD><EFBFBD>
alarm_enable = b"\x03"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
# <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ñ<EFBFBD>ǩ
def set_tag(self, tag_id, freq=None, alarm_status=None, detection_config=None, timeout=1):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
if freq or detection_config:
config_enable = b"\x01"
else:
config_enable = b"\x00"
if freq:
dynamic_freq, inactive_freq, alarm_freq = freq
d_freq_bytes = int(dynamic_freq).to_bytes(length=2, byteorder="big", signed=False)
i_freq_bytes = int(inactive_freq).to_bytes(length=2, byteorder="big", signed=False)
a_freq_bytes = int(alarm_freq).to_bytes(length=2, byteorder="big", signed=False)
else:
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# <20><><EFBFBD><EFBFBD>
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# ״̬<D7B4><CCAC><EFBFBD><EFBFBD>
if detection_config:
hr_alarm_value_max, bo_alarm_value_min, \
hr_opt_value_max, hr_opt_value_min, \
bo_opt_value_max, bo_opt_value_min = detection_config
hr_alarm_value_max_bytes = int(hr_alarm_value_max).to_bytes(length=1, byteorder="big", signed=False)
bo_alarm_value_min_bytes = int(bo_alarm_value_min).to_bytes(length=1, byteorder="big", signed=False)
hr_opt_value_max_bytes = int(hr_opt_value_max).to_bytes(length=1, byteorder="big", signed=False)
hr_opt_value_min_bytes = int(hr_opt_value_min).to_bytes(length=1, byteorder="big", signed=False)
bo_opt_value_max_bytes = int(bo_opt_value_max).to_bytes(length=1, byteorder="big", signed=False)
bo_opt_value_min_bytes = int(bo_opt_value_min).to_bytes(length=1, byteorder="big", signed=False)
else:
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# <20><><EFBFBD><EFBFBD>
if alarm_status is not None:
alarm_enable = b"\x01"
if alarm_status:
alarm_config = b"\x01"
else:
alarm_config = b"\x02"
else:
alarm_enable = b"\x00"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
def get_anchor_config(self, timeout=3):
cmd = 'EB02A20FAD64'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_ANCHOR_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_ANCHOR_CONFIG)
if bytes_data:
info = re.search(rb"[\x00-\xFF]+", bytes_data).group()
return info
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return
def set_anchor_power(self, power: int, timeout=5):
power = max(min(255, power), 0)
config = self.get_anchor_config(timeout)
if config:
config_array = bytearray(config)
config_array[2] = power
config_array[19] = 0
config = bytes(config_array[:-2])
tx_str = b"\xeb\x20\xc2" + config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_SET_ANCHOR_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_SET_ANCHOR_CONFIG)
if bytes_data:
info = re.search(b"\x81\xF5", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{e.args}")
return False
else:
return False
# def set_auto_power(self, timeout):