LISHUZUOXUN_yangjiang/uwb_data_getter.py

122 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os.path
import sys
from matplotlib import pyplot as plt
from UWB.multi_uwb import *
result = []
__waiting_signal = threading.Event()
__waiting_signal.clear()
RECORD_RESULT_DIR = os.path.join(sys.path[0], "DebugRecord")
if not os.path.exists(RECORD_RESULT_DIR):
os.makedirs(RECORD_RESULT_DIR)
def data_collect(uwb: MultiUWB):
while True:
__waiting_signal.wait()
data = uwb.get_data()
if data:
print(data)
del data['status']
# data.update({"record_time": time.time()})
result.append(data)
def clear_all_data(uwb: MultiUWB):
uwb.clear_all_record()
result.clear()
def data_processing():
global result
fig, axs = plt.subplots(2, 2, sharey="row")
group_result = {}
anchor_group_result = {}
for record_data in result:
record_time = record_data["record_time"]
tag = record_data[TAG_ID]
anchor = record_data[ANCHOR_ID]
distance = record_data[TOTAL_RSSI]
group_result.setdefault(tag, [])
group_result[tag].append([record_time, distance])
anchor_group_result.setdefault(anchor, [])
anchor_group_result[anchor].append([record_time, distance])
for tag, record in group_result.items():
x = [r[0] for r in record]
distance = [r[1] for r in record]
y = [tag for r in record]
axs[0, 0].scatter(x=x, y=y, s=1)
axs[0, 1].scatter(x=distance, y=y, s=1)
for anchor, record in anchor_group_result.items():
x = [r[0] for r in record]
distance = [r[1] for r in record]
y = [anchor for r in record]
axs[1, 0].scatter(x=x, y=y, s=1)
axs[1, 1].scatter(x=distance, y=y, s=1)
fig.subplots_adjust(wspace=0)
plt.show()
print("Uwb测试软件启动成功Made By 广东泳华科技)")
# 初始化设备
uwb = MultiUWB()
uwb.start()
print("正在将该基站设置为定位模式。。。")
while not uwb.start_all_record():
time.sleep(1)
print("设置成功!")
threading.Thread(target=data_collect, args=(uwb,), daemon=True).start()
# 开始采集数据
print(
"______________________________\n"
"|命令代号\t|命令动作\n"
"| 【s】 \t|开始记录\n"
"| 【p】 \t|显示结果并保存已收集数据\n"
"| 【r】 \t|显示已收集的数据数量\n"
"| 【h】 \t|显示帮助\n"
"| 【q】 \t| 退出 \n"
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
)
while True:
cmd = input("请输入命令:")
if cmd == "s":
print("正在启动基站")
while not uwb.start_all_record():
time.sleep(1)
print("设置成功!")
__waiting_signal.set()
elif cmd == "p":
__waiting_signal.clear()
data_processing()
print("正在关闭基站")
while not uwb.stop_all_record():
time.sleep(1)
print("设置成功!")
with open(os.path.join(RECORD_RESULT_DIR, f"{time.time_ns()}.json"), "w", encoding="utf-8_sig") as file:
json.dump(result, file, ensure_ascii=True, indent=2)
clear_all_data(uwb)
elif cmd == "h":
print(
"______________________________\n"
"|命令代号\t|命令动作\n"
"| 【s】 \t|开始记录\n"
"| 【p】 \t|显示结果并保存已收集数据\n"
"| 【r】 \t|显示已收集的数据数量\n"
"| 【h】 \t|显示帮助\n"
"| 【q】 \t| 退出 \n"
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
)
elif cmd == "q":
break
elif cmd == "r":
print(f"已收集到{len(result)}条数据!")
elif cmd == "c":
uwb.clear_all_record()