245 lines
8.7 KiB
Python
245 lines
8.7 KiB
Python
import os.path
|
||
import socket
|
||
import threading
|
||
import time
|
||
import traceback
|
||
from datetime import datetime
|
||
from queue import Queue
|
||
import concurrent.futures
|
||
import msgpack
|
||
|
||
from AcrossPlatform.get_platform import GLOBAL_DIR
|
||
from LSZXNetWork.consensus import *
|
||
from LogRecord.log_recorder import GLOBAL_LOG
|
||
|
||
|
||
class Client:
|
||
|
||
def __init__(self, ip, timeout=0.5) -> None:
|
||
super().__init__()
|
||
self.timeout = timeout
|
||
self.ip = ip
|
||
self.target_ip = None
|
||
self.target_port = None
|
||
|
||
def set_timeout(self, timeout):
|
||
self.timeout = timeout
|
||
|
||
def connect2(self, ip, port):
|
||
self.target_ip = ip
|
||
self.target_port = port
|
||
|
||
def directly_send(self, args):
|
||
address, data = args
|
||
address = tuple(address)
|
||
try:
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.settimeout(self.timeout)
|
||
sock.bind((self.ip, 0))
|
||
|
||
# 增加一步封装,以区分内容为数据还是文件
|
||
final_pkg = {
|
||
DATA_TYPE: DATA,
|
||
DATA: data
|
||
}
|
||
|
||
bytes_data = msgpack.dumps(final_pkg)
|
||
status_code = sock.connect_ex(address)
|
||
if status_code == 0:
|
||
seq_data = [
|
||
bytes_data[
|
||
i * SEND_BUFFER_SIZE:
|
||
(i + 1) * SEND_BUFFER_SIZE
|
||
]
|
||
for i in range(int(len(bytes_data) / SEND_BUFFER_SIZE) + 1)
|
||
]
|
||
for b_data in seq_data:
|
||
sock.send(b_data)
|
||
# sock.send(bytes_data)
|
||
else:
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
print(traceback.format_exc())
|
||
return False
|
||
|
||
def distributed_send(self, address_list, pkg_list, distributor_nums=32):
|
||
try:
|
||
# 创建一个线程池,最大线程数为distributor_nums
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=distributor_nums) as executor:
|
||
# 提交任务到线程池
|
||
for address, pkg in zip(address_list, pkg_list):
|
||
executor.submit(self.directly_send, (address, pkg))
|
||
return True
|
||
except RuntimeError:
|
||
return False
|
||
|
||
def send(self, data):
|
||
try:
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.settimeout(self.timeout)
|
||
sock.bind((self.ip, 0))
|
||
|
||
# 增加一步封装,以区分内容为数据还是文件
|
||
final_pkg = {
|
||
DATA_TYPE: DATA,
|
||
DATA: data
|
||
}
|
||
|
||
bytes_data = msgpack.dumps(final_pkg)
|
||
status_code = sock.connect_ex((self.target_ip, self.target_port))
|
||
if status_code == 0:
|
||
seq_data = [
|
||
bytes_data[
|
||
i * SEND_BUFFER_SIZE:
|
||
(i + 1) * SEND_BUFFER_SIZE
|
||
]
|
||
for i in range(int(len(bytes_data) / SEND_BUFFER_SIZE) + 1)
|
||
]
|
||
for b_data in seq_data:
|
||
sock.send(b_data)
|
||
else:
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
print(e.args)
|
||
print(traceback.format_exc())
|
||
return False
|
||
|
||
def send_file2(self, file_path, file_name):
|
||
try:
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.settimeout(self.timeout)
|
||
sock.bind((self.ip, 0))
|
||
|
||
status_code = sock.connect_ex((self.target_ip, self.target_port))
|
||
if status_code == 0:
|
||
with open(file_path, "rb") as file:
|
||
data = file_name.encode("gb2312") + b"\x00"
|
||
# 增加一步封装,以区分内容为数据还是文件
|
||
while data:
|
||
sock.send(data)
|
||
data = file.read(SEND_BUFFER_SIZE)
|
||
else:
|
||
return False
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(e.args)
|
||
print(traceback.format_exc())
|
||
return False
|
||
|
||
|
||
class Server:
|
||
|
||
def __init__(self, ip, port, file_port=0, cache_path=GLOBAL_DIR) -> None:
|
||
super().__init__()
|
||
# 文件保存路径
|
||
self.cache_path = cache_path
|
||
self.cache_name = None
|
||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
self.sock.bind((ip, port))
|
||
self.sock.listen(128)
|
||
self._running_signal = threading.Event()
|
||
self._running_signal.set()
|
||
if file_port > 0:
|
||
self.file_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.file_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
self.file_sock.bind((ip, file_port))
|
||
self.file_sock.listen(128)
|
||
threading.Thread(target=self.file_recv, daemon=True).start()
|
||
else:
|
||
self.file_sock = None
|
||
# 启动监听服务
|
||
threading.Thread(target=self.monitor, daemon=True).start()
|
||
# 数据接收缓存
|
||
self.queue = Queue()
|
||
|
||
def get_download_file_path(self):
|
||
if not self.cache_name:
|
||
return None
|
||
return os.path.join(self.cache_path, self.cache_name)
|
||
|
||
def monitor(self):
|
||
while self._running_signal.is_set():
|
||
try:
|
||
conn, connected_addr = self.sock.accept()
|
||
conn: socket.socket
|
||
conn.settimeout(0.5)
|
||
cache = b""
|
||
while True:
|
||
try:
|
||
bytes_data = conn.recv(RECV_BUFFER_SIZE)
|
||
if bytes_data:
|
||
cache += bytes_data
|
||
else:
|
||
# 成功接收全部信息,解包
|
||
data = msgpack.loads(cache)
|
||
# 判断数据标签
|
||
if data[DATA_TYPE] == DATA:
|
||
self.queue.put(data[DATA])
|
||
break
|
||
except Exception as e:
|
||
GLOBAL_LOG.write(f"数据接收时发生错误{e.args}")
|
||
break
|
||
except Exception as ce:
|
||
GLOBAL_LOG.write(f"数据传输连接时发生错误{ce.args}")
|
||
|
||
def file_recv(self):
|
||
while self._running_signal.is_set():
|
||
try:
|
||
conn, connected_addr = self.file_sock.accept()
|
||
conn: socket.socket
|
||
conn.settimeout(1)
|
||
os.makedirs(self.cache_path, exist_ok=True)
|
||
filename_bytes, data = conn.recv(RECV_BUFFER_SIZE).split(b"\x00", 1)
|
||
self.cache_name = filename_bytes.decode("gb2312")
|
||
complete_cache_path = os.path.join(self.cache_path, self.cache_name)
|
||
with open(complete_cache_path, 'wb') as file:
|
||
while True:
|
||
file.write(data)
|
||
data = conn.recv(RECV_BUFFER_SIZE)
|
||
if not data:
|
||
break
|
||
conn.close()
|
||
except Exception as ce:
|
||
GLOBAL_LOG.write(f"文件传输连接时发生错误{ce.args}")
|
||
|
||
def get_data(self, timeout=None):
|
||
if not timeout:
|
||
return self.queue.get()
|
||
else:
|
||
return self.queue.get(timeout)
|
||
|
||
def stop(self):
|
||
self._running_signal.clear()
|
||
self.sock.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
server = Server(ip="127.0.0.1", port=13132)
|
||
client = Client(ip="127.0.0.1")
|
||
for i in range(100):
|
||
# client = Client(ip="127.0.0.1", port=0)
|
||
client.connect2(ip="127.0.0.1", port=13132)
|
||
client.send({f"mes{i}": "hello"})
|
||
print(server.get_data(1))
|
||
|
||
# address_list = []
|
||
# pkg_list = []
|
||
# for i in range(100):
|
||
# address_list.append(("127.0.0.1", 13132))
|
||
# pkg_list.append({f"mes{i}": "hello"})
|
||
# start_time = time.time()
|
||
# client.distributed_send(address_list, pkg_list)
|
||
# print(time.time() - start_time)
|
||
# for i in range(100):
|
||
# print(server.get_data(1))
|
||
|
||
time.sleep(5)
|
||
# server.stop()
|