• Python使用socket的UDP协议实现FTP文件服务


    简介

      本示例主要是用Python的socket,使用UDP协议实现一个FTP服务端、FTP客户端,用来实现文件的传输。在公司内网下,可以不使用U盘的情况下,纯粹使用网络,来实现文件服务器的搭建,进而实现文件的网络传输。同时用来理解Python的socket使用。

      服务端运行起来后,会把服务器上面的指定目录作为根目录提供给客户端使用,即客户端可以访问、下载服务端设置的根目录里面的文件内容

      客户端可以发送一些简单命令给服务端,支持的一些简单命令如下:

    • “ll”或者“ls”  查看当前目录下的所有文件或者目录
    • “pwd”  查看当前所在的目录(根目录是服务端设置的“D:\var”目录)
    • “get 文件名”  下载指定的文件到客户端配置的目录(客户端指定的根目录,在运行时配置)
    • “get 目录”  下载指定的目录到客户端配置的目录
    • “get all”  把当前所在的目录的所有文件、目录下载到客户端配置的目录
    • “cd”  把客户端的目录切换到根目录
    • “cd 目录” 把客户端的目录切换到指定的目录
    • “cd ..” 把客户端的目录切换到上一级目录

     

      客户端和服务端之间的通信,是把dict格式使用pickle.dumps()和pickle.loads()转成对应的bytes类型进行传输的。dict格式参考代码。

     

    使用效果示例

    先运行服务端代码,再运行客户端代码。然后再在客户端输入响应的命令即可

     

    代码

    file_handler.py

      该文件就是把对文件的一些操作进行提取出来,供UDP服务端使用

    复制代码
    
    
    import os
    import logging
    import traceback


    LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    _logger = logging.getLogger()


    class FileHandler:
    def __init__(self, logger=None):
    """
    由于设置成可以支持自定义的logger,因此就没有设置成工具类形式,使用的时候,还是要实例化该类的
    :param logger:
    """
    self.__logger = logger if logger else _logger

    def list_dir(self, dir_path: str, return_absolute_path=False) -> list:
    """
    获取指定目录下面的文件或者目录的列表
    返回列表,里面中的每一个元素都是元组,元组的第一个值是文件或者目录名,第二个值是"d"(代表目录)或者"f"(代表文件)
    :param dir_path:
    :param return_absolute_path: True: 返回值是返回文件或者目录的绝对路径; False: 只返回文件名或者目录名
    :return: [('download', 'd'), ('mylog.txt', 'f')]
    """
    ret_list = []
    try:
    if os.path.exists(dir_path):
    ret = os.listdir(dir_path)
    for item in ret:
    if os.path.isdir(os.path.join(dir_path, item)):
    if return_absolute_path:
    ret_list.append((os.path.join(dir_path, item), 0, "d"))
    else:
    ret_list.append((item, 0, "d"))
    if os.path.isfile(os.path.join(dir_path, item)):
    size = os.path.getsize(os.path.join(dir_path, item))
    if return_absolute_path:
    ret_list.append((os.path.join(dir_path, item), size, "f"))
    else:
    ret_list.append((item, size, "f"))
    except Exception:
    self.__logger.error("Can not list dir: [%s]" % dir_path + traceback.format_exc())
    finally:
    return ret_list

    def seek_file(self, file_path: str, start_index: int, length=1024) -> tuple:
    """
    通过二进制格式读取指定一个文件指定范围的内容
    :param file_path:
    :param start_index:
    :param length: 读取的字节数
    :return:
    """
    # 下一次访问的时候的起始start_index值。 -1代表已经访问到文件结尾了,不用再访问该文件了。
    content_bytes = b''
    next_index = -1
    if not os.path.exists(file_path):
    message = "File[%s] not exists !!!" % file_path
    self.__logger.error(message)
    raise Exception(message)
    file_size = os.path.getsize(file_path) # 文件大小

    if start_index >= file_size:
    return content_bytes, next_index
    try:
    # print("### file_size: ", file_size)
    with open(file_path, "rb") as fh:
    fh.seek(start_index) # 游标跳到指定位置
    content_bytes = fh.read(length) # 读取文件内容
    # print("content_bytes: ", content_bytes)
    # print("type(content_bytes): ", type(content_bytes))
    if start_index + length < file_size:
    next_index = start_index + length
    except Exception:
    self.__logger.error("Seek file exception !!! " + traceback.format_exc())
    finally:
    return content_bytes, next_index


    if __name__ == '__main__':
    file = r"D:\var\download\system.log"
    file_target = r"D:\var\download\system.txt"
    file = r"D:\软件安装包\NetAssist.exe"
    file_target = r"D:\软件安装包\NetAssist_copy.exe"
    file_obj = FileHandler()
    # ret = file_obj.seek_file(file, start_index=17, length=30)
    # print("ret: ", ret)
    # file_obj.copy_file(file, file_target, 1024 * 1000)
     
    复制代码

     

    服务端代码

    复制代码
    
    
    """
    使用socket的udp协议实现的一个ftp服务端。
    服务器和客户端之间传递数据格式:
    1、服务端和客户端统一使用Python的字典格式(也就是本例中自定义的"通信协议"),格式形如:
    {
    "type": "cmd", # 支持的值有: "cmd"、"download"
    "body": "ll", # 在cmd模式下,常用的命令有: ll、ls、cd 指定目录、pwd
    "pwd": ["folder1", "folder2"],
    "status": 1,
    "uuid": "b93e21e659f711ee9285a46bb6f59f55" # uuid.uuid1().hex,用来保证客户端和服务端
    }
    2、客户端使用pickle模块的pickle.dumps(dict类型数据)把要发送的数据转成bytes类型
    3、客户端使用socket的udp传输转成的bytes数据给服务端
    4、服务端接收到从客户端发送过来的bytes类型的数据,再使用pickle.loads(bytes类型数据)把数据转成原始的dict类型。

    使用socket的udp,既能接收数据,又能发送数据,因此服务端和客户端都是相对的。
    """

    import os
    import pickle
    import sys
    import socket
    import logging
    import time
    import traceback
    from file_handler import FileHandler

    LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    _logger = logging.getLogger()

    _HOST = "127.0.0.1"
    _PORT = 8090 # 服务器端口号
    _ROOT_DIR = r"D:\var" # 服务端给客户端展示的可以进行下载的文件路径


    __all__ = ["FTPServerUDP"]


    class FTPServerUDP:
    def __init__(self, host="", port=None, root_dir="", logger=None):
    self.__host = host if host else _HOST
    self.__port = port if port else _PORT
    self.__root_dir = root_dir if root_dir else _ROOT_DIR
    self.__logger = logger if logger else _logger
    self.__file_handler = FileHandler()

    self.__socket_obj = self.__get_socket_obj()

    self.__message_type_unsupported = "Unsupported message type"
    self.__message_type_server_inner_error = "Server internal error"
    self.__message_type_path_not_exists = "Target path not exists"
    self.__message_type_ok = "ok"

    def __get_socket_obj(self) -> socket.socket:
    socket_obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    socket_obj.bind((self.__host, self.__port)) # 服务端必须绑定IP和端口
    return socket_obj

    def __message_handler(self, message_tuple: tuple):
    # 给客户端返回的数据类型,最终还需要使用pickle.dumps()把字典类型转成bytes类型进行发送
    response_message_dict = {
    "type": "cmd",
    "body": self.__message_type_unsupported,
    "pwd": [], # 客户端的相对于服务端self.__root_dir的相对路径
    "status": 0 # 0:代表后端处理结果异常; 1:代表后端处理结果正常
    }

    recv_message_bytes = message_tuple[0]
    client_tuple = message_tuple[1] # 客户端元组信息,形如 ('127.0.0.1', 59699)

    # 确保接收到的消息是 pickle.dumps() 转成的bytes类型,并把数据转成dict类型
    # recv_message_dict 的格式形如上面的response_message_dict格式
    recv_message_dict = self.__check_recv_message_type(recv_message_bytes)
    if not recv_message_dict:
    response_message_dict["body"] = self.__message_type_unsupported
    response_message_dict["status"] = 0
    self.__send_bytes_message(response_message_dict, client_tuple)
    return
    # 把客户端的进入的目录,赋值给返回值的目录。
    response_message_dict["pwd"] = recv_message_dict.get("pwd", [])

    # 把客户端传递进来的uuid,赋值给返回值的目录
    response_message_dict["uuid"] = recv_message_dict.get("uuid", "")

    # 接收到的消息符合规范,就需要根据"type"类型进行分类。
    try:
    print("recv_message_dict: ", recv_message_dict)
    if recv_message_dict.get("type", "") == "cmd":
    self.__cmd_handler(recv_message_dict, response_message_dict, client_tuple)
    elif recv_message_dict.get("type", "") == "download":
    self.__download_handler(recv_message_dict, response_message_dict, client_tuple)
    pass
    except Exception:
    self.__logger.error("Server message handler exception !!!" + traceback.format_exc())
    response_message_dict["status"] = 0
    response_message_dict["body"] = self.__message_type_server_inner_error
    self.__send_bytes_message(response_message_dict, client_tuple)

    def __check_recv_message_type(self, recv_message_bytes: bytes) -> dict:
    """
    确保接收到的消息是 pickle.dumps() 转成的bytes类型,并且通过pickle.loads()把bytes类型的数据转成dict后,
    dict类型数据中要有: "type"、"body"、"pwd"、"uuid" 等字段
    :param recv_message_bytes:
    :return:
    """
    ret_dict = {}
    try:
    message_dict = pickle.loads(recv_message_bytes)
    except Exception:
    return ret_dict

    if not isinstance(message_dict, dict):
    return ret_dict

    # 接收到的dict类型的消息中,必须要有如下的字段
    if not {"type", "body", "pwd", "uuid"}.issubset(set(message_dict.keys())):
    return ret_dict

    return message_dict

    def __send_bytes_message(self, message_dict: dict, client_tuple: tuple):
    """
    使用pickle.dumps()把字典格式的消息(message_dict),转成bytes发送给客户端(client_tuple)
    :param message_dict:
    :param client_tuple:
    :return:
    """
    try:
    message_bytes = pickle.dumps(message_dict)
    self.__socket_obj.sendto(message_bytes, client_tuple)
    except Exception:
    self.__logger.error("Send message to client exception !!! " + traceback.format_exc())
    message_dict["status"] = 0
    message_dict["body"] = self.__message_type_server_inner_error
    message_bytes = pickle.dumps(message_dict)
    self.__socket_obj.sendto(message_bytes, client_tuple)

    def __cmd_handler(self, recv_message_dict: dict, response_message_dict: dict, client_tuple: tuple):
    """
    处理消息体的type参数值是"cmd",这种命令行的消息
    :param recv_message_dict:
    :param response_message_dict:
    :param client_tuple:
    :return:
    """
    cmd_str = recv_message_dict.get("body", "").lower().strip()
    if not cmd_str or (not isinstance(cmd_str, str)):
    # 返回值形如: {'type': 'cmd', 'body': 'Unsupported message type', 'pwd': ['download']}
    response_message_dict["body"] = self.__message_type_unsupported
    response_message_dict["status"] = 0
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    if cmd_str in ("ls", "ll"): # 查看当前目录的文件目录列表
    # 返回值形如: {'type': 'cmd', 'body': [('folder1', 'd'), ('file1.txt', 'f')]}
    customer_dir = self.__root_dir
    if recv_message_dict.get("pwd", []):
    customer_dir = os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []))
    response_message_dict["body"] = self.__file_handler.list_dir(customer_dir)
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    if cmd_str.startswith("cd"): # 切换到下一个目录
    if cmd_str.strip() == "cd":
    response_message_dict["pwd"] = []
    response_message_dict["body"] = self.__message_type_ok
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    target_dir = cmd_str.split(" ")[-1]
    if target_dir == "..":
    response_message_dict["pwd"] = response_message_dict["pwd"][0:-1]
    response_message_dict["body"] = self.__message_type_ok
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    if target_dir == "/":
    response_message_dict["body"] = self.__message_type_ok
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return
    if not os.path.exists(os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir)):
    # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']}
    response_message_dict["body"] = self.__message_type_path_not_exists
    response_message_dict["status"] = 0
    self.__send_bytes_message(response_message_dict, client_tuple)
    return
    elif not (os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir).startswith(self.__root_dir)):
    # 客户进入的目录,必须是以 self.__root_dir 开头,不能进入到其他目录
    # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']}
    # response_message_dict["pwd"] = []
    response_message_dict["body"] = self.__message_type_path_not_exists
    response_message_dict["status"] = 0
    self.__send_bytes_message(response_message_dict, client_tuple)
    return
    elif os.path.isfile(os.path.join(self.__root_dir, *recv_message_dict.get("pwd", []), target_dir)): # 文件
    # 返回值形如: {'type': 'cmd', 'body': 'Target path not exists', 'pwd': ['folder1']}
    response_message_dict["body"] = self.__message_type_path_not_exists
    response_message_dict["status"] = 0
    self.__send_bytes_message(response_message_dict, client_tuple)
    return
    else:
    # 返回值形如: {'type': 'cmd', 'body': 'ok', 'pwd': ['folder1', 'folder2']}
    response_message_dict["pwd"].append(target_dir)
    response_message_dict["body"] = self.__message_type_ok
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    if cmd_str == "pwd":
    # 返回值形如 {'type': 'cmd', 'body': ['folder1'], 'pwd': ['folder1']}
    response_message_dict["body"] = response_message_dict.get("pwd", [])
    response_message_dict["status"] = 1 # 处理结果符合预期,状态设置成1
    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    self.__send_bytes_message(response_message_dict, client_tuple)
    return

    def __download_handler(self, recv_message_dict: dict, response_message_dict: dict, client_tuple: tuple):
    """
    处理消息体的type参数值是"download",这种下载单个文件的命令行的消息
    :param recv_message_dict:
    :param response_message_dict:
    :param client_tuple:
    :return:
    """
    response_message_dict["type"] = "download"
    try:
    file_name = recv_message_dict["body"]["file_name"] # 要下载的文件的名字
    pwd_list = recv_message_dict["pwd"] # 要下载的文件所在的路径
    start_index = recv_message_dict["body"]["start_index"] # 要下载的文件的起始位置索引
    length = recv_message_dict["body"]["length"] # 单次下载的字节数
    if length > 1024 * 60: # 单次传输的数据太大的话,会报错。通过测试,单次发送60KB数据比较合适。
    length = 1024 * 60
    file_name_absolute = os.path.join(self.__root_dir, *pwd_list, file_name) # 要下载的文件的绝对路径

    content_bytes, next_index = self.__file_handler.seek_file(file_path=file_name_absolute,
    start_index=start_index,
    length=length)
    response_message_dict["body"] = {
    "file_name": file_name,
    "pwd": pwd_list,
    "content_bytes": content_bytes,
    "start_index": start_index,
    "next_index": next_index
    }
    response_message_dict["status"] = 1
    self.__send_bytes_message(response_message_dict, client_tuple)
    except Exception:
    response_message_dict["body"] = self.__message_type_server_inner_error
    response_message_dict["status"] = 0
    self.__logger.error("Download file exception !!!" + traceback.format_exc())
    self.__send_bytes_message(response_message_dict, client_tuple)

    def run(self):
    """
    调用该方法,启动服务端
    :return:
    """
    self.__logger.info("Server is running at [%s@%s], wating for client ..." % (self.__host, self.__port))
    while True:
    try:
    # message_tuple 是一个元组类型: (b'消息体', ('127.0.0.1', 61040)),第一个值是bytes类型的消息,第二个值是客户端信息
    message_tuple = self.__socket_obj.recvfrom(1024 * 1024) # 从客户端接收到的消息
    self.__message_handler(message_tuple)
    except Exception:
    self.__logger.error("FTP Server Error: " + traceback.format_exc())
    time.sleep(1)


    if __name__ == '__main__':
    ftp_server_obj = FTPServerUDP(host="127.0.0.1",
    port=8090,
    root_dir=r"D:\var" # 服务端的根目录,该目录下的文件及目录可以供客户端查看、下载
    )
    ftp_server_obj.run()
     
    复制代码

     

    客户端代码

    复制代码
    
    
    """
    使用socket的udp协议实现的一个ftp客户端。
    服务器和客户端之间传递数据格式:
    1、服务端和客户端统一使用Python的字典格式(也就是本例中自定义的"通信协议"),格式形如:
    {
    "type": "cmd", # 支持的值有: "cmd"、"download"
    "body": "ll", # 在cmd模式下,常用的命令有: ll、ls、cd 指定目录、pwd
    "pwd": ["folder1", "folder2"],
    "status": 1,
    "uuid": "b93e21e659f711ee9285a46bb6f59f55" # uuid.uuid1().hex,用来保证客户端和服务端
    }
    2、客户端使用pickle模块的pickle.dumps(dict类型数据)把要发送的数据转成bytes类型
    3、客户端使用socket的udp传输转成的bytes数据给服务端
    4、服务端接收到从客户端发送过来的bytes类型的数据,再使用pickle.loads(bytes类型数据)把数据转成原始的dict类型。

    使用socket的udp,既能接收数据,又能发送数据,因此服务端和客户端都是相对的。
    """

    import socket
    import sys
    import time
    import pickle
    import os
    import logging
    import traceback
    import uuid

    LOG_FORMAT = "%(asctime)s - %(levelname)s [%(filename)s-%(funcName)s] Line: %(lineno)s] - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
    _logger = logging.getLogger()


    class FTPClientUDP:
    def __init__(self, host="127.0.0.1", port=8090, download_path="D:\my_download", logger=None):
    self.__host = host # 服务器的IP地址
    self.__port = port # 服务器的端口号
    self.__download_root_path = download_path # 要下载的文件路径
    self.__logger = logger if logger else _logger
    if not os.path.exists(download_path):
    os.makedirs(download_path, True)

    self.__socket_obj = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP连接对象
    self.__is_downloading = False # 用来标识是否在下载文件,当在下载文件的时候,就不会再接收输入参数
    self.__pwd_list = [] # 用来存储用户所在的当前目录
    self.__server_tuple = (self.__host, self.__port) # 服务端的地址和端口号组成的元组

    def __cmd_handler(self, cmd_dict: dict):
    """

    :param cmd_dict: 形如:{'type': 'cmd', 'body': 'll', 'pwd': [], 'status': 1,
    'uuid': 'f5c2f4d263f111ee877aa46bb6f59f55'}
    :return:
    """
    # print("cmd_dict: ", cmd_dict)
    cmd_bytes = pickle.dumps(cmd_dict)
    if cmd_dict["body"] in ["ll", "ls"]:
    self.__cmd_handler_ls(cmd_dict, cmd_bytes)
    elif cmd_dict["body"].startswith("cd "):
    self.__cmd_handler_cd(cmd_dict, cmd_bytes)
    elif cmd_dict["body"] == "pwd":
    self.__cmd_handler_pwd(cmd_dict, cmd_bytes)
    elif cmd_dict["body"].startswith("get "):
    self.__cmd_handler_get(cmd_dict, cmd_bytes)
    else:
    print("Command [%s] not supported !!!" % cmd_dict["body"])
    print()

    def __cmd_handler_ls(self, cmd_dict: dict, cmd_bytes: bytes):
    """
    发送类似于"ls -la" 命令给服务端,并把返回值打印到终端
    :param cmd_dict:
    :param cmd_bytes:
    :return:
    """
    recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes)
    self.__print_normal_info(recv_message_dict)

    def __cmd_handler_cd(self, cmd_dict: dict, cmd_bytes: bytes):
    """
    发送类似于"cd 指定目录"命令给服务端,并把返回值打印到终端
    :param cmd_dict:
    :param cmd_bytes:
    :return:
    """
    recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes)
    self.__print_normal_info(recv_message_dict)
    if recv_message_dict.get("status", 0):
    self.__pwd_list = recv_message_dict["pwd"] # 保存客户端进入的目录

    def __cmd_handler_pwd(self, cmd_dict: dict, cmd_bytes: bytes):
    """
    发送 "pwd" 命令给服务端,并把返回值打印到终端
    :param cmd_dict:
    :param cmd_bytes:
    :return:
    """
    recv_message_dict = self.__recv_normal_cmd_handler(cmd_dict, cmd_bytes)
    self.__print_pwd_info(recv_message_dict)
    if recv_message_dict.get("status", 0):
    self.__pwd_list = recv_message_dict["pwd"] # 保存客户端进入的目录

    def __cmd_handler_get(self, cmd_dict: dict, cmd_bytes: bytes):
    # 获取当前目录下面的所有的文件,包括文件夹
    if cmd_dict["body"].lower().strip() == "get all":
    self.__download_handler_all_file()
    return

    # 获取当前目录下面"get "命令后面的一个文件
    command = cmd_dict["body"]
    file_name = cmd_dict["body"].split(" ")[-1] # 字符串类型的要下载的文件名
    if not file_name:
    return
    # 首先获取当前目录"ll"的返回值
    cmd_dict_ll = {'type': 'cmd',
    'body': 'll',
    'pwd': cmd_dict["pwd"],
    'status': 1,
    'uuid': uuid.uuid1().hex
    }
    ret_dict = self.__recv_normal_cmd_handler(cmd_dict_ll)
    # print("111######### ret_dict: ", ret_dict)
    can_download = False
    file_size = 0 # 文件大小
    pwd = [] # 文件所处路径
    file_type = "f" # f:文件 d:目录
    for item_tuple in ret_dict["body"]: # item_tuple形如 ('aa.txt', 34, 'f')
    if item_tuple[0] == file_name:
    can_download = True
    file_size = item_tuple[1]
    pwd = ret_dict["pwd"]
    file_type = item_tuple[-1]
    if can_download and file_type == "f":
    self.__download_handler_one_file(file_name, file_size, pwd)
    print()
    if can_download and file_type == "d":
    raw_pwd = self.__pwd_list # 存储用户最原始的pwd目录。
    self.__download_handler_one_folder(file_name, pwd)
    self.__pwd_list = raw_pwd # 把用户最原始的pwd目录再重新赋给self.__pwd_list变量中。
    else:
    print()

    def __download_handler_one_file(self, file_name: str, file_size=0, pwd=[]):
    # 发送下载单个文件的命令格式,下载的时候,要来回更新 download_cmd_dict['body']['start_index']的值
    download_cmd_dict = {
    'type': 'download',
    'body': {
    "file_name": file_name,
    "pwd": pwd,
    "start_index": 0, #
    "length": 1024 * 1024 # 使用UDP传输的数据,单次数据太大的话会报错
    },
    'pwd': pwd,
    'status': 1,
    'uuid': uuid.uuid1().hex
    }
    try:
    file_absolute = os.path.join(self.__download_root_path, *pwd, file_name)
    if not os.path.exists(os.path.dirname(file_absolute)):
    os.mkdir(os.path.dirname(file_absolute))
    with open(file_absolute, "wb") as fh:
    while True:
    download_cmd_bytes = pickle.dumps(download_cmd_dict)
    ret_dict = self.__recv_normal_cmd_handler(download_cmd_dict, download_cmd_bytes)
    if not ret_dict["status"]: # 下载途中失败了
    self.__logger.error("Download exception: " + str(ret_dict["body"]))
    break
    fh.write(ret_dict["body"]["content_bytes"])
    next_index = ret_dict["body"]["next_index"]
    if ret_dict["body"]["next_index"] == -1: # 说明已经把文件下载完了
    break

    download_cmd_dict["body"]["start_index"] = ret_dict["body"]["next_index"]
    download_cmd_dict["uuid"] = uuid.uuid1().hex
    self.__print_download_info(file_absolute, file_size, next_index)
    self.__print_download_successfully_info(file_absolute)
    except Exception:
    self.__logger.error("Download file exception !!!" + traceback.format_exc())

    def __download_handler_one_folder(self, file_name: str, pwd=[]):
    """
    下载一个文件夹。
    :param file_name: 即文件夹名字。进入到该方法,默认文件名是存在的
    :param pwd:
    :return:
    """
    if not os.path.exists(os.path.join(self.__download_root_path, *pwd, file_name)):
    os.makedirs(os.path.join(self.__download_root_path, *pwd, file_name))

    cmd_dict_ll = {'type': 'cmd',
    'body': 'll',
    'pwd': pwd, # 该参数会在后面的代码中进行替换更新
    'status': 1,
    'uuid': uuid.uuid1().hex
    }
    cmd_dict_cd = {'type': 'cmd',
    'body': 'cd ' + file_name, # 进入到的目录
    'pwd': pwd,
    'status': 1,
    'uuid': uuid.uuid1().hex
    }

    # 先进入指定目录
    recv_message_dict_cd = self.__recv_normal_cmd_handler(cmd_dict_cd, None)
    pwd = recv_message_dict_cd["pwd"] # 保存进入的目录
    # print("#### pwd: ", pwd)
    # 获取指定目录的文件列表
    cmd_dict_ll["pwd"] = pwd
    recv_message_dict_ll = self.__recv_normal_cmd_handler(cmd_dict_ll, None)
    # print("&&&&&&&v recv_message_dict_ll: ", recv_message_dict_ll)
    if recv_message_dict_ll["status"] and recv_message_dict_ll["body"]:
    for item_tuple in recv_message_dict_ll["body"]:
    # print("@@@@ item_tuple: ", item_tuple)
    if item_tuple[-1] == "f":
    self.__download_handler_one_file(file_name=item_tuple[0],
    file_size=item_tuple[1],
    pwd=pwd
    )
    elif item_tuple[-1] == "d":
    self.__download_handler_one_folder(file_name=item_tuple[0],
    pwd=pwd)

    def __download_handler_all_file(self):
    """
    下载当前目录下面的所有文件及文件夹
    :return:
    """
    if not os.path.exists(os.path.join(self.__download_root_path, *self.__pwd_list)):
    os.makedirs(os.path.join(self.__download_root_path, *self.__pwd_list))

    cmd_dict_ll = {'type': 'cmd',
    'body': 'll',
    'pwd': self.__pwd_list, # 该参数会在后面的代码中进行替换更新
    'status': 1,
    'uuid': uuid.uuid1().hex
    }
    recv_message_dict_ll = self.__recv_normal_cmd_handler(cmd_dict_ll, None)
    if recv_message_dict_ll["status"] and recv_message_dict_ll["body"]:
    for item_tuple in recv_message_dict_ll["body"]:
    if item_tuple[-1] == "f":
    self.__download_handler_one_file(file_name=item_tuple[0],
    file_size=item_tuple[1],
    pwd=recv_message_dict_ll["pwd"]
    )
    elif item_tuple[-1] == "d":
    self.__download_handler_one_folder(file_name=item_tuple[0],
    pwd=recv_message_dict_ll["pwd"]
    )


    def __recv_normal_cmd_handler(self, cmd_dict: dict, cmd_bytes: bytes=None, try_times=3) -> dict:
    """
    持续发送一条命令到客户端,直到正常接收到数据后结束
    :param cmd_dict: 形如 {'type': 'cmd', 'body': 'get aa.txt', 'pwd': [], 'status': 1,
    'uuid': '464899225dcb11ee9a91a46bb6f59f55'}
    :param cmd_bytes:
    :param try_times: 重试的次数,即

    :return: 形如 {'type': 'cmd', 'body': [('log', 0, 'd'), ('mylog.txt', 8, 'f')],
    'pwd': [], 'status': 1, 'uuid': '464899235dcb11eebf99a46bb6f59f55'}
    或者 {'type': 'cmd', 'body': {'file_name': 'logger.log', 'pwd': ['log'], 'content_bytes': b'2023-',
    'start_index': 0, 'next_index': 5},
    'pwd': [], 'status': 1, 'uuid': '464899235dcb11eebf99a46bb6f59f55'}
    """
    ret_dict = {}
    cmd_bytes = cmd_bytes if cmd_bytes else pickle.dumps(cmd_dict)
    try:
    for x in range(try_times):
    # print("cmd_dict_ll: ", cmd_dict)
    self.__socket_obj.sendto(cmd_bytes, self.__server_tuple) # 服务器端的地址
    recv_message_tuple = self.__socket_obj.recvfrom(1024 * 1024)
    recv_message_bytes = recv_message_tuple[0]
    recv_message_dict = pickle.loads(recv_message_bytes)
    if cmd_dict.get("uuid", "False") == recv_message_dict.get("uuid", "True"):
    ret_dict = recv_message_dict
    break
    time.sleep(0.1)
    except Exception:
    self.__logger.error("Recv normal cmd info exception !!!" + traceback.format_exc())
    finally:
    return ret_dict

    def __print_normal_info(self, recv_message_dict: dict):
    message_body = recv_message_dict.get("body", None)
    if isinstance(message_body, list):
    for item in message_body:
    print("%-20s %-20s %-20s" % (item[0], item[1], item[2]))
    print()
    return True
    if isinstance(message_body, str):
    print("%-20s" % message_body)
    print()
    return True
    return True

    def __print_pwd_info(self, recv_message_dict: dict):
    pwd_list = recv_message_dict.get("pwd", [])
    if not pwd_list:
    print("%-20s" % "/")
    else:
    pwd_str = '/' + '/'.join(pwd_list)
    print("%-20s" % pwd_str)
    print()
    return True

    def __print_download_info(self, file_name, file_size, next_index):
    sys.stdout.write("\r[%s] -->: %s" % (file_name, ("%.3f" % (next_index / file_size* 100) + "%")))

    def __print_download_successfully_info(self, file_name):
    """
    最终打印100%
    :param file_name:
    :return:
    """
    sys.stdout.write("\r[%s] -->: %s" % (file_name, "100%"))
    print()

    def run(self):
    while True:
    try:
    # 没有下载任务的话,则接收输入新的命令
    if not self.__is_downloading:
    cmd = input("Input: ")
    cmd_dict = {
    "type": "cmd",
    "body": cmd,
    "pwd": self.__pwd_list,
    "status": 1,
    "uuid": uuid.uuid1().hex
    }
    self.__cmd_handler(cmd_dict)
    print("================================================")
    else:
    time.sleep(1)
    except Exception:
    self.__logger.error("Client exception: " + traceback.format_exc())
    time.sleep(1)


    if __name__ == '__main__':
    ftp_client_obj = FTPClientUDP(host="127.0.0.1",
    port=8090,
    download_path="D:\my_download" # 客户端下载文件的根目录
    )
    ftp_client_obj.run()
     
    复制代码

     

     

  • 相关阅读:
    中间代码生成(Intermediate Code Generation)
    glTF格式详解
    金九银十,Java 程序员面试历程(附字节,阿里,百度,网易,美团等面经)
    leetcode 每日一题目 (树的直径 +DFS的深刻理解)
    java毕业设计基于的测试项目管理平台Mybatis+系统+数据库+调试部署
    深度学习:张量 介绍
    App逆向入门
    CSS 高阶小技巧 - 角向渐变的妙用!
    LabVIEW使用机器学习分类模型探索基于技能课程的学习
    总结数据结构-1
  • 原文地址:https://www.cnblogs.com/jiaconglei/p/17744683.html