LISHUZUOXUN_yangjiang/UWB/uwb_driver.py

788 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=gb2312
import binascii
import re
import threading
import time
import serial
import serial.tools.list_ports
from LogRecord.log_recorder import GLOBAL_LOG
def crc16(data):
# 初始化crc为0xFFFF
crc = 0xFFFF
# 循环处理每个数据字节
for byte in data:
# 将每个数据字节与crc进行异或操作
crc ^= byte
# 对crc的每一位进行处理
for _ in range(8):
# 如果最低位为1则右移一位并执行异或0xA001操作(即0x8005按位颠倒后的结果)
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
# 如果最低位为0则仅将crc右移一位
else:
crc = crc >> 1
# 返回最终的crc值
return crc.to_bytes(2, byteorder='little', signed=False)
# 检测uwb模块方法
def detect_uwb(ser):
cmd = 'EB01FF4024'
tx_str = bytes(bytearray.fromhex(cmd))
ser.write(tx_str)
time.sleep(0.5)
hex_str = binascii.b2a_hex(ser.read_all()).decode().upper().strip()
if cmd in hex_str:
return True
else:
return False
# 标签状态
MODE_NORMAL = b"\xab"
MODE_ALARM = b"\xee"
# 标签警报状态
MODE_UNWEAR = 0
MODE_WEAR = 1
MODE_ILLEGAL = 2
ANCHOR_ID = "anchor_id"
TAG_ID = "tag_id"
BATTERY = "battery"
STATUS = "status"
DISTANCE = "distance"
DYNAMIC_FREQ = "dynamic_freq"
INACTIVE_FREQ = "inactive_freq"
ALARM_FREQ = "alarm_freq"
HR = "hr"
BO = "bo"
DETECT = "detect"
HR_ALARM_VALUE_MAX = "hr_alarm_value_max"
BO_ALARM_VALUE_MIN = "bo_alarm_value_min"
HR_OPT_VALUE_MAX = "hr_opt_value_max"
HR_OPT_VALUE_MIN = "hr_opt_value_min"
BO_OPT_VALUE_MAX = "bo_opt_value_max"
BO_OPT_VALUE_MIN = "bo_opt_value_min"
ILLEGAL_ALARM_TIMES = "illegal_alarm_times"
RECORD = "record_time"
IS_SUCCESSFUL = "is_successful"
FIRST_RSSI = "first_rssi"
TOTAL_RSSI = "total_rssi"
def data_decode_func_1_5(data):
anchor_id_reverse = data[0: 8]
tag_id_reverse = data[8: 14]
tag_battery = data[14]
tag_status = data[15:16]
tag_is_successful = data[17]
tag_distance = data[18:20]
tag_first_rssi = data[20:22]
tag_total_rssi = data[22:24]
tag_dynamic_freq = data[25:27]
tag_inactive_freq = data[27:29]
tag_alarm_freq = data[29:21]
tag_hr = data[34]
tag_bo = data[35]
tag_detect = data[38]
hr_alarm_value_max = data[39]
bo_alarm_value_min = data[40]
hr_opt_value_max = data[41]
hr_opt_value_min = data[42]
bo_opt_value_max = data[43]
bo_opt_value_min = data[44]
illegal_alarm_times = data[45]
# 数据解析
anchor_id = anchor_id_reverse[::-1].hex()
tag_id = tag_id_reverse[::-1].hex()
return {
ANCHOR_ID: anchor_id,
TAG_ID: tag_id,
BATTERY: tag_battery,
STATUS: tag_status,
DISTANCE: int.from_bytes(tag_distance, byteorder="big", signed=False),
IS_SUCCESSFUL: tag_is_successful,
FIRST_RSSI: int.from_bytes(tag_first_rssi, byteorder="big", signed=True),
TOTAL_RSSI: int.from_bytes(tag_total_rssi, byteorder="big", signed=True),
DYNAMIC_FREQ: int.from_bytes(tag_dynamic_freq, byteorder="big", signed=False),
INACTIVE_FREQ: int.from_bytes(tag_inactive_freq, byteorder="big", signed=False),
ALARM_FREQ: int.from_bytes(tag_alarm_freq, byteorder="big", signed=False),
HR: tag_hr,
BO: tag_bo,
DETECT: tag_detect,
HR_ALARM_VALUE_MAX: hr_alarm_value_max,
BO_ALARM_VALUE_MIN: bo_alarm_value_min,
HR_OPT_VALUE_MAX: hr_opt_value_max,
HR_OPT_VALUE_MIN: hr_opt_value_min,
BO_OPT_VALUE_MAX: bo_opt_value_max,
BO_OPT_VALUE_MIN: bo_opt_value_min,
ILLEGAL_ALARM_TIMES: illegal_alarm_times,
RECORD: time.time()
}
def data_decode_func(data):
# 源数据解析
anchor_id_reverse = data[0: 8]
tag_id_reverse = data[8: 14]
tag_battery = data[14]
tag_status = data[15:16]
tag_distance = data[17:19]
tag_dynamic_freq = data[24:26]
tag_inactive_freq = data[26:28]
tag_alarm_freq = data[28:30]
tag_hr = data[33]
tag_bo = data[34]
tag_detect = data[37]
hr_alarm_value_max = data[38]
bo_alarm_value_min = data[39]
hr_opt_value_max = data[40]
hr_opt_value_min = data[41]
bo_opt_value_max = data[42]
bo_opt_value_min = data[43]
illegal_alarm_times = data[44]
# 数据解析
anchor_id = anchor_id_reverse[::-1].hex()
tag_id = tag_id_reverse[::-1].hex()
return {
ANCHOR_ID: anchor_id,
TAG_ID: tag_id,
BATTERY: tag_battery,
STATUS: tag_status,
DISTANCE: int.from_bytes(tag_distance, byteorder="big", signed=False),
DYNAMIC_FREQ: int.from_bytes(tag_dynamic_freq, byteorder="big", signed=False),
INACTIVE_FREQ: int.from_bytes(tag_inactive_freq, byteorder="big", signed=False),
ALARM_FREQ: int.from_bytes(tag_alarm_freq, byteorder="big", signed=False),
HR: tag_hr,
BO: tag_bo,
DETECT: tag_detect,
HR_ALARM_VALUE_MAX: hr_alarm_value_max,
BO_ALARM_VALUE_MIN: bo_alarm_value_min,
HR_OPT_VALUE_MAX: hr_opt_value_max,
HR_OPT_VALUE_MIN: hr_opt_value_min,
BO_OPT_VALUE_MAX: bo_opt_value_max,
BO_OPT_VALUE_MIN: bo_opt_value_min,
ILLEGAL_ALARM_TIMES: illegal_alarm_times,
RECORD: time.time()
}
# 基站状态
MODE_FREE = 0
MODE_POSITION = 1
MODE_TWR = 2
# 指令类型
TYPE_DATA = b'\xa0'
TYPE_READ_STATUS = b'\xa3'
TYPE_WRITE_STATUS = b'\xc3'
TYPE_WRITE_CONFIG = b'\xc4'
TYPE_REBOOT = b'\xd0'
TYPE_TWR = b'\xc5'
TYPE_TWR_RESULT = b'\xa1'
TYPE_TEMP = b'\xa5'
TYPE_TAG_CONFIG = b'\xc7'
TYPE_ANCHOR_CONFIG = b'\xa2'
TYPE_SET_ANCHOR_CONFIG = b'\xc2'
class MySerial(serial.Serial):
def write(self, data):
# TODO:调试
# GLOBAL_LOG.write("write:" + data.hex())
return super().write(data)
def read(self, size=1):
# TODO:调试
data = super().read(size)
# GLOBAL_LOG.write("read:" + data.hex())
return data
class UwbController:
device_list = []
def __init__(self, uwb_timeout=0.1) -> None:
super().__init__()
"""************************************线程信号初始化************************************"""
self.__running = threading.Event()
self.__running.set()
self.__data_signal = threading.Event()
self.__data_signal.clear()
self.__resume = threading.Event()
self.__resume.set()
self.__get_new_position_data = threading.Event()
self.__get_new_position_data.clear()
"""************************************初始化设备驱动************************************"""
self.anchor_id = None
self.anchor_version = None
# 串口设置
self.ser = None
for port in list(serial.tools.list_ports.comports()):
self.ser = MySerial(timeout=uwb_timeout, baudrate=115200) # 初始化串口模块
if str(port.device) in UwbController.device_list:
continue
if 'CP210' in str(port.description):
self.ser.port = str(port.device)
try:
self.ser.open() # 打开串口
self.set_anchor_status_directly(MODE_FREE)
time.sleep(0.5)
# 尝试获取UWB模块的信息
if detect_uwb(self.ser):
UwbController.device_list.append(str(port.device))
self.anchor_id, self.anchor_version = self.get_anchor_mes()
print(f"自身基站ID为{self.anchor_id},基站版本为:{self.anchor_version}")
break
except Exception:
if self.ser.isOpen():
self.ser.close()
"""************************************设备状态测试************************************"""
self.is_start = True
if self.anchor_id is None:
self.is_start = False
if not self.ser.isOpen():
self.is_start = False
"""************************************数据初始化************************************"""
self.buffer = b""
self.data_cache = {}
self.decode_func = {
TYPE_DATA: data_decode_func_1_5
}
# 状态检测
def is_initialed(self):
return self.is_start
"""************************************线程控制模块************************************"""
def start(self):
threading.Thread(target=self._threading_data_recv, daemon=True).start()
threading.Thread(target=self._threading_data_decode, daemon=True).start()
def pause(self):
self.__resume.clear()
def resume(self):
self.__resume.set()
def stop(self):
self.__resume.set()
self.__data_signal.set()
self.__running.clear()
"""************************************上层驱动方法************************************"""
def get_cache(self, _type):
if _type not in self.data_cache.keys():
return None
else:
try:
if self.data_cache.get(_type):
return self.data_cache[_type].pop(0)
else:
return None
except KeyError:
return None
def clear_cache(self, _type):
if _type in self.data_cache.keys():
if self.data_cache[_type]:
self.data_cache[_type].clear()
def clear_all(self):
self.data_cache = {}
self.buffer = b""
def _threading_data_recv(self):
while self.__running.is_set():
self.__resume.wait()
try:
bytes_data = self.ser.read(128)
# print(len(bytes_data))
if bytes_data:
self.buffer += bytes_data
self.__data_signal.set()
except:
time.sleep(0.001)
def _threading_data_decode(self):
while self.__running.is_set():
self.__resume.wait()
try:
self.__data_signal.wait()
self.__data_signal.clear()
while True:
eb_cmd_re = re.search(b"\xeb[\x00-\xff]{3,}", self.buffer[0:256])
if eb_cmd_re:
# 获取指令的头尾坐标
re_start = eb_cmd_re.start()
re_end = eb_cmd_re.end()
# 更新缓存
self.buffer = self.buffer[re_start::]
# 获得指令的长度
eb_len = self.buffer[1]
# 判断是否已经接受到足够长的指令+CRC校验码长度2
if eb_len + 2 <= len(self.buffer[2::]):
# 提取数据
data = self.buffer[3: 3 + eb_len + 1]
# 获取指令类型
eb_type = self.buffer[2:3]
# crc校验
whole_data = self.buffer[0: eb_len + 2]
crc = crc16(whole_data)
crc_got = self.buffer[eb_len + 2: eb_len + 2 + 2]
# 更新缓存
self.buffer = self.buffer[1 + eb_len::]
if crc_got != crc:
# print("uwb接受到错误数据包已丢弃")
continue
# 获得指定指令方法
if eb_type in self.decode_func.keys():
decode_data = self.decode_func[eb_type](data)
else:
decode_data = data
# 保存信息
self.data_cache.setdefault(eb_type, [])
self.data_cache[eb_type].append(decode_data)
if eb_type == TYPE_DATA:
self.__get_new_position_data.set()
else:
break
except Exception as e:
continue
print(f"发生错误:{e.args}")
# 获得基站的版本信息
def get_anchor_mes(self):
cmd = 'EB01C00034'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.ser.write(tx_str)
time.sleep(0.5)
bytes_data = self.ser.read_all()
info = re.search(rb"(?<=\xEB\x0E\xC0)[\x00-\xFF]{13}", bytes_data).group()
anchor_id_rev = [info[i:i + 1].hex() for i in range(8)]
anchor_id_rev.reverse()
anchor_id = "".join(anchor_id_rev)
anchor_version = "{}.{}.{}.{}".format(int(info[8]), int(info[9]), int(info[10]), int(info[11]))
return anchor_id, anchor_version
except Exception as e:
print(f"发生错误:{e.args}")
return
# 读取基站状态
def get_anchor_status(self, timeout=1):
cmd = 'EB01A3401D'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_READ_STATUS)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_READ_STATUS)
if bytes_data:
info = re.search(rb"[\x00-\xFF]{3}", bytes_data).group()
status = info[0]
return status
except Exception as e:
print(f"发生错误:{e.args}")
return
# 直接设置基站状态
def set_anchor_status_directly(self, status):
status_bytes = status.to_bytes(1, byteorder='little', signed=False)
tx_str = b"\xEB\x02\xC3" + status_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.ser.write(tx_str)
except Exception as e:
print(f"发生错误:{e.args}")
# 关机模式:关闭周边所有的手环
def close_all_tag(self, ):
tx_str = b"\xEB\x02\xC3\x04\xC5\x33"
try:
self.ser.write(tx_str)
except Exception as e:
print(f"发生错误:{e.args}")
# 关闭关机模式:停止关闭周边所有的手环
def stop_close_all_tag(self):
tx_str = b"\xEB\x02\xC3\x00\xC4\xF0"
try:
self.ser.write(tx_str)
except Exception as e:
print(f"发生错误:{e.args}")
# 设置基站状态
def set_anchor_status(self, status, timeout=1):
status_bytes = status.to_bytes(1, byteorder='little', signed=False)
tx_str = b"\xEB\x02\xC3" + status_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.clear_cache(TYPE_WRITE_STATUS)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_WRITE_STATUS)
if bytes_data:
info = re.search(status_bytes + crc, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 设置基站顺序和编号
def set_anchor_config(self, anchor_seq: int, anchor_no: int, timeout=1):
anchor_seq_bytes = anchor_seq.to_bytes(1, byteorder='little', signed=False)
anchor_no_bytes = anchor_no.to_bytes(2, byteorder='big', signed=False)
tx_str = b"\xEB\x04\xC4" + anchor_seq_bytes + anchor_no_bytes
crc = crc16(tx_str)
tx_str += crc
try:
self.clear_cache(TYPE_WRITE_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_WRITE_CONFIG)
if bytes_data:
info = re.search(anchor_seq_bytes + anchor_no_bytes + crc, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 重启基站
def reboot(self, timeout=5):
cmd = 'EB01D001F8'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_REBOOT)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_REBOOT)
if bytes_data:
info = re.search(b"\x01\xF8", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
def twr(self, target_anchor: str, timeout=10):
anchor_reverse = bytes(bytearray.fromhex(target_anchor))[::-1]
cmd = 'EB0AC501'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += anchor_reverse
crc = crc16(tx_str)
tx_str += crc
check_code = "010690"
check_code = bytes(bytearray.fromhex(check_code))
try:
self.clear_cache(TYPE_TWR)
self.clear_cache(TYPE_TWR_RESULT)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TWR)
if bytes_data:
info = re.search(check_code, bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 获得基站间的TWR测距值
def get_twr_result(self, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
try:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TWR_RESULT)
if bytes_data:
distance = int.from_bytes(bytes_data[0:2], byteorder="big", signed=False)
success_time = int.from_bytes(bytes_data[2:4], byteorder="big", signed=False)
return {
"distance": distance, "success_time": success_time
}
except Exception as e:
print(f"发生错误:{e.args}")
# 获得模块温度
def get_temp(self, timeout=1):
cmd = 'EB01A5C01F'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_TEMP)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TEMP)
if bytes_data:
info = bytes_data[0:2]
temp = int.from_bytes(info, byteorder="big", signed=False) * 0.01
return temp
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 获取测距数据
def get_data(self, timeout=0.1):
self.__get_new_position_data.wait(timeout=timeout)
data = self.get_cache(TYPE_DATA)
if not data:
self.__get_new_position_data.clear()
return data
# 清理标签
def clear_tag(self, tag_id, timeout=5):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
config_enable = b"\x00"
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# 功率
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# 状态检测
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# 警报
alarm_enable = b"\x02"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 关闭标签
def close_tag(self, tag_id, timeout=5):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
config_enable = b"\x00"
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# 功率
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# 状态检测
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# 警报
alarm_enable = b"\x03"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
# 快速配置标签
def set_tag(self, tag_id, freq=None, alarm_status=None, detection_config=None, timeout=1):
tag_id_reverse = bytes(bytearray.fromhex(tag_id))[::-1]
cmd = 'EB19C7'
tx_str = bytes(bytearray.fromhex(cmd))
tx_str += tag_id_reverse
tx_str += b"\x01"
if freq or detection_config:
config_enable = b"\x01"
else:
config_enable = b"\x00"
if freq:
dynamic_freq, inactive_freq, alarm_freq = freq
d_freq_bytes = int(dynamic_freq).to_bytes(length=2, byteorder="big", signed=False)
i_freq_bytes = int(inactive_freq).to_bytes(length=2, byteorder="big", signed=False)
a_freq_bytes = int(alarm_freq).to_bytes(length=2, byteorder="big", signed=False)
else:
d_freq_bytes = b"\x00\x00"
i_freq_bytes = b"\x00\x00"
a_freq_bytes = b"\x00\x00"
tx_str += config_enable + d_freq_bytes + i_freq_bytes + a_freq_bytes
# 功率
power_enable = b'\x00'
power_config = b'\x00'
tx_str += power_enable + power_config
# 状态检测
if detection_config:
hr_alarm_value_max, bo_alarm_value_min, \
hr_opt_value_max, hr_opt_value_min, \
bo_opt_value_max, bo_opt_value_min = detection_config
hr_alarm_value_max_bytes = int(hr_alarm_value_max).to_bytes(length=1, byteorder="big", signed=False)
bo_alarm_value_min_bytes = int(bo_alarm_value_min).to_bytes(length=1, byteorder="big", signed=False)
hr_opt_value_max_bytes = int(hr_opt_value_max).to_bytes(length=1, byteorder="big", signed=False)
hr_opt_value_min_bytes = int(hr_opt_value_min).to_bytes(length=1, byteorder="big", signed=False)
bo_opt_value_max_bytes = int(bo_opt_value_max).to_bytes(length=1, byteorder="big", signed=False)
bo_opt_value_min_bytes = int(bo_opt_value_min).to_bytes(length=1, byteorder="big", signed=False)
else:
hr_alarm_value_max_bytes = b"\x00"
bo_alarm_value_min_bytes = b"\x00"
hr_opt_value_max_bytes = b"\x00"
hr_opt_value_min_bytes = b"\x00"
bo_opt_value_max_bytes = b"\x00"
bo_opt_value_min_bytes = b"\x00"
tx_str += hr_alarm_value_max_bytes + bo_alarm_value_min_bytes \
+ hr_opt_value_max_bytes + hr_opt_value_min_bytes \
+ bo_opt_value_max_bytes + bo_opt_value_min_bytes
# 警报
if alarm_status is not None:
alarm_enable = b"\x01"
if alarm_status:
alarm_config = b"\x01"
else:
alarm_config = b"\x02"
else:
alarm_enable = b"\x00"
alarm_config = b"\x00"
tx_str += alarm_enable + alarm_config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_TAG_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_TAG_CONFIG)
if bytes_data:
info = re.search(b"\x41\xF6", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
def get_anchor_config(self, timeout=3):
cmd = 'EB02A20FAD64'
tx_str = bytes(bytearray.fromhex(cmd))
try:
self.clear_cache(TYPE_ANCHOR_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_ANCHOR_CONFIG)
if bytes_data:
info = re.search(rb"[\x00-\xFF]+", bytes_data).group()
return info
except Exception as e:
print(f"发生错误:{e.args}")
return
def set_anchor_power(self, power: int, timeout=5):
power = max(min(255, power), 0)
config = self.get_anchor_config(timeout)
if config:
config_array = bytearray(config)
config_array[2] = power
config_array[19] = 0
config = bytes(config_array[:-2])
tx_str = b"\xeb\x20\xc2" + config
tx_str += crc16(tx_str)
try:
self.clear_cache(TYPE_SET_ANCHOR_CONFIG)
self.ser.write(tx_str)
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(0.05)
bytes_data = self.get_cache(TYPE_SET_ANCHOR_CONFIG)
if bytes_data:
info = re.search(b"\x81\xF5", bytes_data).group()
if info:
return True
else:
return False
except Exception as e:
print(f"发生错误:{e.args}")
return False
else:
return False
# def set_auto_power(self, timeout):