目录
说明:只能连接一个客户端
- import socket,binascii
-
- # 创建一个 TCP 套接字
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- # 绑定套接字到指定的主机和端口
- server_address = ('localhost', 6666)
- server_socket.bind(server_address)
-
- # 开始监听连接
- server_socket.listen(5) # 最多同时监听 5 个连接请求
-
- print("等待客户端连接...")
-
- while True:
- # 等待客户端连接
- client_socket, client_address = server_socket.accept()
- print(f"接受来自 {client_address} 的连接")
-
- # 接收客户端发送的数据
- while True:
- data = client_socket.recv(1024) # 最多接收 1024 字节的数据
- data_hex = binascii.hexlify(data).decode('utf-8') # 将二进制数据转换为十六进制字符串
- print(data_hex)
-
- # 向客户端回复响应数据
- sendData = "1A2B3C"
- client_socket.send(bytes.fromhex(sendData))
- if not data:
- break # 如果没有数据,退出循环
-
- # 关闭与客户端的连接
- client_socket.close()
- break
-
- # 关闭服务器套接字
- server_socket.close()
客户端测试结果:

服务端测试结果:

说明:可以支持连接多个客户端
- import socket,binascii,threading
-
- def thread_HandleClient(client_socket): # 用于处理与客户端连接后的逻辑
- while True:
- # 接收客户端发送的数据
- data = client_socket.recv(1024)
- data_hex = binascii.hexlify(data).decode('utf-8') # 将二进制数据转换为十六进制字符串
- print(f"接收到来自 {client_socket.getpeername()} 的数据:{data_hex}")
-
- if not data:
- break # 如果没有数据,退出循环
-
- # 向客户端发送响应数据
- sendData = "1A2B3C"
- client_socket.send(bytes.fromhex(sendData))
- # client_socket.sendall(bytes.fromhex(sendData))
-
- # 关闭与客户端的连接
- client_socket.close()
-
- # 创建一个 TCP 套接字
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- # 绑定套接字到指定的主机和端口
- server_address = ('localhost', 6666)
- server_socket.bind(server_address)
-
- # 开始监听连接
- server_socket.listen(5) # 最多同时监听 5 个连接请求
-
- print("等待客户端连接...")
-
- while True:
- # 等待客户端连接
- client_socket, client_address = server_socket.accept()
- print(f"接受来自 {client_address} 的连接")
-
- # 创建一个新线程来处理客户端连接
- client_thread = threading.Thread(target=thread_HandleClient, args=(client_socket,))
- client_thread.start()
客户端测试结果:

服务端测试结果:

说明: 可以支持连接多个客户端,并且能够做到和多个客户端发送、接收信息同时进行
- import socket,threading,binascii,time
-
-
- def receive_data(client_socket):
- while True:
- try:
- # 接收客户端发送的数据
- data = client_socket.recv(1024)
- data_hex = binascii.hexlify(data).decode('utf-8') # 将二进制数据转换为十六进制字符串
- print(f"接收到来自 {client_socket.getpeername()} 的数据:{data_hex}")
-
- if not data:
- break # 如果没有数据,退出循环
-
- except Exception as e:
- print(f"接收数据时出现错误:{e}")
- break
-
- # 关闭与客户端的连接
- client_socket.close()
-
- def send_data(client_socket):
- while True:
- try:
- sendData = "1A2A3A" # 要发送的数据
- client_socket.sendall(bytes.fromhex(sendData)) # 向客户端发送数据
- time.sleep(1) # 1s发1条
-
- except Exception as e:
- print(f"发送数据时出现错误:{e}")
- break
-
- if __name__ == '__main__':
- # 创建一个 TCP 套接字
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- # 绑定套接字到指定的主机和端口
- server_address = ('localhost', 6666)
- server_socket.bind(server_address)
-
- # 开始监听连接
- server_socket.listen(5) # 最多同时监听 5 个连接请求
-
- print("等待客户端连接...")
-
- while True:
- # 等待客户端连接
- client_socket, client_address = server_socket.accept()
- print(f"接受来自 {client_address} 的连接")
-
- # 创建两个新线程分别处理接收和发送操作
- receive_thread = threading.Thread(target=receive_data, args=(client_socket,))
- send_thread = threading.Thread(target=send_data, args=(client_socket,))
-
- # 启动线程
- receive_thread.start()
- send_thread.start()
客户端测试结果:

服务端测试结果:
