LISHUZUOXUN_yangjiang/UWB/data_node_via_tcp.py

147 lines
4.9 KiB
Python

import os
import re
import socket
import threading
import msgpack
DATA_NODE_DEFAULT_PORT = 7777
DATA_HEAD = b"\xDD\xAA\xAA"
DEFAULT_IP_SEGMENT = r"192.168.137.(?!255)[0-9]{1,3}"
# 获得自身的ip地址
def get_self_ip_address():
ip_list = get_machine_ip_address()
return ip_list[0]
# 获得同个网段下的设备
def get_machine_ip_address():
echo = os.popen("arp -a").read()
return re.findall(DEFAULT_IP_SEGMENT, echo)
def send_2(domain, data, port=DATA_NODE_DEFAULT_PORT):
data_bytes = msgpack.dumps(data)
data_len = len(data_bytes)
len_bytes = int.to_bytes(data_len, 3, byteorder="big")
data_body = DATA_HEAD + len_bytes + data_bytes
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.bind((get_self_ip_address(), 0))
client.bind((DataNode.SELF_IP, 0))
client.settimeout(1)
client.connect((domain, port))
client.send(data_body)
client.close()
return True
except Exception as e:
# print(e)
return False
class DataNode(threading.Thread):
SELF_IP = "0.0.0.0"
def __init__(self, domain="0.0.0.0", port=DATA_NODE_DEFAULT_PORT, report=False) -> None:
super().__init__(daemon=True)
self.report = report
self.domain = domain
self.tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_server.bind((self.domain, port))
self.tcp_server.listen(1024)
self.tcp_server.settimeout(1)
self.data = b""
self.package = []
self.__running = threading.Event() # 用于停止线程的标识
self.__running.set() # 将running设置为True
self.__had_data = threading.Event()
self.__had_data.clear()
def get_data(self):
if self.package:
return self.package.pop(0)
def clear_data(self):
self.data = b""
self.package.clear()
def _thread_data_processing(self):
while self.__running.is_set():
self.__had_data.wait()
while self.data:
head = re.search(DATA_HEAD + b"(.|\n|\r){3}", self.data)
if not head:
continue
data_start = head.end()
data_len_mes = head.group()[3:6]
high_byte = data_len_mes[0]
middle_byte = data_len_mes[1]
low_byte = data_len_mes[2]
data_len = high_byte * 256 * 256 + middle_byte * 256 + low_byte
data_end = data_start + data_len
data_bytes = self.data[data_start: data_end:]
# 解析数据
try:
data = msgpack.loads(data_bytes)
except:
continue
self.package.append(data)
# 更新数据
self.data = self.data[data_end::]
self.__had_data.clear()
def start(self) -> None:
super().start()
threading.Thread(target=self._thread_data_processing, daemon=True).start()
def run(self) -> None:
while self.__running.is_set():
# 等待客户端连接
connected_addr = None
try:
conn, connected_addr = self.tcp_server.accept()
conn: socket.socket
conn.settimeout(1)
if self.report:
print("[Data Node]{} connected!".format(connected_addr))
while True:
try:
data = conn.recv(8192)
if data:
# print("receive", data)
self.data += data
self.__had_data.set()
else:
if self.report:
print("[Data Node]{} disconnected!".format(connected_addr))
break
except socket.timeout:
continue
except:
if self.report:
print("[Data Node]{} disconnected!".format(connected_addr))
break
except socket.timeout:
if connected_addr:
if self.report:
print("[Data Node]{} disconnected!".format(connected_addr))
except ConnectionResetError:
if connected_addr:
if self.report:
print("[Data Node]{} disconnected!".format(connected_addr))
except Exception as e:
if connected_addr:
if self.report:
print("[Data Node]{} disconnected!".format(connected_addr))
self.tcp_server.close()
def stop(self):
try:
self.__running.clear()
self.tcp_server.close()
except:
return