122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
import json
|
||
import os.path
|
||
import sys
|
||
|
||
from matplotlib import pyplot as plt
|
||
|
||
from UWB.multi_uwb import *
|
||
|
||
result = []
|
||
__waiting_signal = threading.Event()
|
||
__waiting_signal.clear()
|
||
RECORD_RESULT_DIR = os.path.join(sys.path[0], "DebugRecord")
|
||
if not os.path.exists(RECORD_RESULT_DIR):
|
||
os.makedirs(RECORD_RESULT_DIR)
|
||
|
||
|
||
def data_collect(uwb: MultiUWB):
|
||
while True:
|
||
__waiting_signal.wait()
|
||
data = uwb.get_data()
|
||
if data:
|
||
print(data)
|
||
del data['status']
|
||
# data.update({"record_time": time.time()})
|
||
result.append(data)
|
||
|
||
|
||
def clear_all_data(uwb: MultiUWB):
|
||
uwb.clear_all_record()
|
||
result.clear()
|
||
|
||
|
||
def data_processing():
|
||
global result
|
||
fig, axs = plt.subplots(2, 2, sharey="row")
|
||
group_result = {}
|
||
anchor_group_result = {}
|
||
|
||
for record_data in result:
|
||
record_time = record_data["record_time"]
|
||
tag = record_data[TAG_ID]
|
||
anchor = record_data[ANCHOR_ID]
|
||
distance = record_data[TOTAL_RSSI]
|
||
group_result.setdefault(tag, [])
|
||
group_result[tag].append([record_time, distance])
|
||
anchor_group_result.setdefault(anchor, [])
|
||
anchor_group_result[anchor].append([record_time, distance])
|
||
|
||
for tag, record in group_result.items():
|
||
x = [r[0] for r in record]
|
||
distance = [r[1] for r in record]
|
||
y = [tag for r in record]
|
||
axs[0, 0].scatter(x=x, y=y, s=1)
|
||
axs[0, 1].scatter(x=distance, y=y, s=1)
|
||
|
||
for anchor, record in anchor_group_result.items():
|
||
x = [r[0] for r in record]
|
||
distance = [r[1] for r in record]
|
||
y = [anchor for r in record]
|
||
axs[1, 0].scatter(x=x, y=y, s=1)
|
||
axs[1, 1].scatter(x=distance, y=y, s=1)
|
||
fig.subplots_adjust(wspace=0)
|
||
plt.show()
|
||
|
||
|
||
print("Uwb测试软件启动成功(Made By 广东泳华科技)")
|
||
# 初始化设备
|
||
uwb = MultiUWB()
|
||
uwb.start()
|
||
print("正在将该基站设置为定位模式。。。")
|
||
while not uwb.start_all_record():
|
||
time.sleep(1)
|
||
print("设置成功!")
|
||
threading.Thread(target=data_collect, args=(uwb,), daemon=True).start()
|
||
|
||
# 开始采集数据
|
||
print(
|
||
"______________________________\n"
|
||
"|命令代号\t|命令动作\n"
|
||
"| 【s】 \t|开始记录\n"
|
||
"| 【p】 \t|显示结果并保存已收集数据\n"
|
||
"| 【r】 \t|显示已收集的数据数量\n"
|
||
"| 【h】 \t|显示帮助\n"
|
||
"| 【q】 \t| 退出 \n"
|
||
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
|
||
)
|
||
while True:
|
||
cmd = input("请输入命令:")
|
||
if cmd == "s":
|
||
print("正在启动基站")
|
||
while not uwb.start_all_record():
|
||
time.sleep(1)
|
||
print("设置成功!")
|
||
__waiting_signal.set()
|
||
elif cmd == "p":
|
||
__waiting_signal.clear()
|
||
data_processing()
|
||
print("正在关闭基站")
|
||
while not uwb.stop_all_record():
|
||
time.sleep(1)
|
||
print("设置成功!")
|
||
with open(os.path.join(RECORD_RESULT_DIR, f"{time.time_ns()}.json"), "w", encoding="utf-8_sig") as file:
|
||
json.dump(result, file, ensure_ascii=True, indent=2)
|
||
clear_all_data(uwb)
|
||
elif cmd == "h":
|
||
print(
|
||
"______________________________\n"
|
||
"|命令代号\t|命令动作\n"
|
||
"| 【s】 \t|开始记录\n"
|
||
"| 【p】 \t|显示结果并保存已收集数据\n"
|
||
"| 【r】 \t|显示已收集的数据数量\n"
|
||
"| 【h】 \t|显示帮助\n"
|
||
"| 【q】 \t| 退出 \n"
|
||
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
|
||
)
|
||
elif cmd == "q":
|
||
break
|
||
elif cmd == "r":
|
||
print(f"已收集到{len(result)}条数据!")
|
||
elif cmd == "c":
|
||
uwb.clear_all_record()
|