import os.path import socket import threading import time import traceback from datetime import datetime from queue import Queue import concurrent.futures import msgpack from AcrossPlatform.get_platform import GLOBAL_DIR from LSZXNetWork.consensus import * from LogRecord.log_recorder import GLOBAL_LOG class Client: def __init__(self, ip, timeout=0.5) -> None: super().__init__() self.timeout = timeout self.ip = ip self.target_ip = None self.target_port = None def set_timeout(self, timeout): self.timeout = timeout def connect2(self, ip, port): self.target_ip = ip self.target_port = port def directly_send(self, args): address, data = args address = tuple(address) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(self.timeout) sock.bind((self.ip, 0)) # 增加一步封装,以区分内容为数据还是文件 final_pkg = { DATA_TYPE: DATA, DATA: data } bytes_data = msgpack.dumps(final_pkg) status_code = sock.connect_ex(address) if status_code == 0: seq_data = [ bytes_data[ i * SEND_BUFFER_SIZE: (i + 1) * SEND_BUFFER_SIZE ] for i in range(int(len(bytes_data) / SEND_BUFFER_SIZE) + 1) ] for b_data in seq_data: sock.send(b_data) # sock.send(bytes_data) else: return False return True except Exception as e: print(traceback.format_exc()) return False def distributed_send(self, address_list, pkg_list, distributor_nums=32): try: # 创建一个线程池,最大线程数为distributor_nums with concurrent.futures.ThreadPoolExecutor(max_workers=distributor_nums) as executor: # 提交任务到线程池 for address, pkg in zip(address_list, pkg_list): executor.submit(self.directly_send, (address, pkg)) return True except RuntimeError: return False def send(self, data): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(self.timeout) sock.bind((self.ip, 0)) # 增加一步封装,以区分内容为数据还是文件 final_pkg = { DATA_TYPE: DATA, DATA: data } bytes_data = msgpack.dumps(final_pkg) status_code = sock.connect_ex((self.target_ip, self.target_port)) if status_code == 0: seq_data = [ bytes_data[ i * SEND_BUFFER_SIZE: (i + 1) * SEND_BUFFER_SIZE ] for i in range(int(len(bytes_data) / SEND_BUFFER_SIZE) + 1) ] for b_data in seq_data: sock.send(b_data) else: return False return True except Exception as e: print(e.args) print(traceback.format_exc()) return False def send_file2(self, file_path, file_name): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(self.timeout) sock.bind((self.ip, 0)) status_code = sock.connect_ex((self.target_ip, self.target_port)) if status_code == 0: with open(file_path, "rb") as file: data = file_name.encode("gb2312") + b"\x00" # 增加一步封装,以区分内容为数据还是文件 while data: sock.send(data) data = file.read(SEND_BUFFER_SIZE) else: return False return True except Exception as e: print(e.args) print(traceback.format_exc()) return False class Server: def __init__(self, ip, port, file_port=0, cache_path=GLOBAL_DIR) -> None: super().__init__() # 文件保存路径 self.cache_path = cache_path self.cache_name = None self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((ip, port)) self.sock.listen(128) self._running_signal = threading.Event() self._running_signal.set() if file_port > 0: self.file_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.file_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.file_sock.bind((ip, file_port)) self.file_sock.listen(128) threading.Thread(target=self.file_recv, daemon=True).start() else: self.file_sock = None # 启动监听服务 threading.Thread(target=self.monitor, daemon=True).start() # 数据接收缓存 self.queue = Queue() def get_download_file_path(self): if not self.cache_name: return None return os.path.join(self.cache_path, self.cache_name) def monitor(self): while self._running_signal.is_set(): try: conn, connected_addr = self.sock.accept() conn: socket.socket conn.settimeout(0.5) cache = b"" while True: try: bytes_data = conn.recv(RECV_BUFFER_SIZE) if bytes_data: cache += bytes_data else: # 成功接收全部信息,解包 data = msgpack.loads(cache) # 判断数据标签 if data[DATA_TYPE] == DATA: self.queue.put(data[DATA]) break except Exception as e: GLOBAL_LOG.write(f"数据接收时发生错误{e.args}") break except Exception as ce: GLOBAL_LOG.write(f"数据传输连接时发生错误{ce.args}") def file_recv(self): while self._running_signal.is_set(): try: conn, connected_addr = self.file_sock.accept() conn: socket.socket conn.settimeout(1) os.makedirs(self.cache_path, exist_ok=True) filename_bytes, data = conn.recv(RECV_BUFFER_SIZE).split(b"\x00", 1) self.cache_name = filename_bytes.decode("gb2312") complete_cache_path = os.path.join(self.cache_path, self.cache_name) with open(complete_cache_path, 'wb') as file: while True: file.write(data) data = conn.recv(RECV_BUFFER_SIZE) if not data: break conn.close() except Exception as ce: GLOBAL_LOG.write(f"文件传输连接时发生错误{ce.args}") def get_data(self, timeout=None): if not timeout: return self.queue.get() else: return self.queue.get(timeout) def stop(self): self._running_signal.clear() self.sock.close() if __name__ == "__main__": server = Server(ip="127.0.0.1", port=13132) client = Client(ip="127.0.0.1") for i in range(100): # client = Client(ip="127.0.0.1", port=0) client.connect2(ip="127.0.0.1", port=13132) client.send({f"mes{i}": "hello"}) print(server.get_data(1)) # address_list = [] # pkg_list = [] # for i in range(100): # address_list.append(("127.0.0.1", 13132)) # pkg_list.append({f"mes{i}": "hello"}) # start_time = time.time() # client.distributed_send(address_list, pkg_list) # print(time.time() - start_time) # for i in range(100): # print(server.get_data(1)) time.sleep(5) # server.stop()