92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
import json
|
||
import sys
|
||
import threading
|
||
import time
|
||
|
||
from Speaker.speak_base import beep
|
||
from UWB.consensus import *
|
||
from UWB.positioning_standalone_v2 import Positioning
|
||
|
||
FACTORY_MODE = threading.Event()
|
||
|
||
# 长跑基站设定码
|
||
SET_ANCHOR = "setanchor"
|
||
PATH = sys.path[0]
|
||
ANCHOR_RSSI_ADJUSTMENT_PATH = f"{PATH}/Exercise3/anchor_adjustment.json"
|
||
|
||
|
||
# 设置方法
|
||
def set_anchor(value: str, eb):
|
||
positioning = eb.positioning
|
||
positioning: Positioning
|
||
speak_driver = eb.speak_driver
|
||
speak_driver.add_speak(f"准备进行长跑基站校准!请将一个手环放置于屏幕前!")
|
||
speak_driver.wait_4_speak()
|
||
recognized_tag = positioning.get_detect_tag(3)
|
||
if not recognized_tag:
|
||
mes = "未识别到手环,请重新发送命令!"
|
||
speak_driver.add_speak(mes)
|
||
return mes
|
||
speak_driver.add_speak("识别手环成功")
|
||
print("识别到手环:", recognized_tag)
|
||
speak_driver.wait_4_speak()
|
||
anchor_list = list(positioning.multi_uwb.uwb_driver_table.keys())
|
||
speak_driver.add_speak(
|
||
f"共有{len(anchor_list)}个基站待校准,"
|
||
f"请在5秒内到达识别边界,"
|
||
f"滴声响后开始自动校准!"
|
||
)
|
||
speak_driver.wait_4_speak()
|
||
for i in range(5):
|
||
speak_driver.add_speak(f"{5 - i}")
|
||
time.sleep(1)
|
||
speak_driver.wait_4_speak()
|
||
beep(freq=5000, duration=1000)
|
||
speak_driver.add_speak(f"开始校准基站!")
|
||
start_time = time.time()
|
||
anchor_rssi_table = {}
|
||
adjustment_num = 100
|
||
finish_count = 0
|
||
beep_count_max = 20
|
||
beep_counter = 0
|
||
while FACTORY_MODE:
|
||
record = positioning.get_last_information()
|
||
if not record:
|
||
continue
|
||
record_time, data = record
|
||
if record_time < start_time:
|
||
continue
|
||
anchor = data[ANCHOR_ID]
|
||
tag = data[TAG]
|
||
rssi = data[RSSI]
|
||
if tag == recognized_tag:
|
||
anchor_rssi_table.setdefault(anchor, [])
|
||
anchor_rssi_table[anchor].append(rssi)
|
||
print(data)
|
||
beep_counter += 1
|
||
if beep_counter >= beep_count_max:
|
||
beep(freq=3000, duration=300)
|
||
beep_counter = 0
|
||
if not len(anchor_rssi_table[anchor]) >= adjustment_num:
|
||
continue
|
||
finish_count += 1
|
||
speak_driver.add_speak(f"{finish_count}个基站完成校准!")
|
||
if finish_count == len(anchor_list):
|
||
break
|
||
# 写入结果
|
||
anchor_rssi_adjustment = {
|
||
anchor: sum(anchor_rssi_table[anchor]) / len(anchor_rssi_table[anchor])
|
||
for anchor in anchor_list
|
||
}
|
||
with open(ANCHOR_RSSI_ADJUSTMENT_PATH, "w", encoding="utf-8_sig") as file:
|
||
json.dump(anchor_rssi_adjustment, file)
|
||
speak_driver.add_speak("已完成所有基站的校准!")
|
||
FACTORY_MODE.clear()
|
||
return "已完成所有基站的校准!"
|
||
|
||
|
||
# 处理方法-代码对照表
|
||
FUNC_TABLE = {
|
||
SET_ANCHOR: set_anchor
|
||
}
|