• Python 实现http server接收mutipart/form-data文件 方法2


    Python 实现http server接收mutipart/form-data文件 方法2

    1 Server端代码

    以下代码亲测有效,仅适应于接收客户端上传的图片,其他文件未曾测试,作者主要应用于平时的自测工具。
    说明1: 如有特殊需求,请自行更改,有意见,评论区留言。
    说明2: 建议使用这个方法用于生产、成熟软件开发Python 实现http server接收mutipart/form-data文件 方法1

    from dataclasses import replace
    from http.server import HTTPServer, BaseHTTPRequestHandler
    import json,os,re,chardet,posixpath,codecs
    from urllib.parse import unquote
    
    class Resquest(BaseHTTPRequestHandler):
        def do_POST(self):
            print("headers:", self.headers)
            print("command:",self.command)
            print("path:",self.path)
            
            bdy = self.headers.get_boundary()
            if not  bdy is None:
                print(" = in headers")
                boundary = self.headers['content-type'].split("=")[1]
                print("boundary:",boundary)
                if "Boundary" in boundary :
                    print("parser image")
                    self.deal_post_data()
                else :
                    print("can't handle boundary")
            else :
                 print(" = not in headers")
                 req_datas = self.rfile.read(int(self.headers['content-length'])) 
                 print("--------------------接受client发送的数据----------------")
                 res1 = req_datas.decode('utf-8')
                 res = json.loads(req_datas)
                 print(res)
                 #f = open('1.txt', 'w')
                 #f.write(req_datas.decode())
                 #f.close()
                 #print(req_datas)
                 print("--------------------接受client发送的数据完毕------------------")
                 #data1 = {'bbb':'222'}
                 #data = json.dumps(data1)
                 #self.send_response(200)
                 #self.send_header('Content-type', 'application/json')
                 #self.end_headers()
                 #self.wfile.write(data)
    
        def deal_post_data(self):
            boundary = self.headers["Content-Type"].split("=")[1].encode('ascii')
            print("boundary===", boundary)
            remain_bytes = int(self.headers['content-length'])
            print("remain_bytes===", remain_bytes)
    
            res = []
            line = self.rfile.readline()
            print("line=",line)
            bdarys = boundary.decode()
            bdarys = bdarys.replace(" ","")
            boundary = bdarys.encode()
            while boundary in line and str(line, encoding = "utf-8")[-4:] != "--\r\n":
                #line = self.rfile.readline()
                remain_bytes -= len(line)
                if boundary not in line:
                    return False, "Content NOT begin with boundary"
                line = self.rfile.readline()
                remain_bytes -= len(line)
                print("line!!!==",str(line))    
                strLine = str(line)      
                fn = strLine.split(';') [2].replace(' filename=','')
                fn= fn[:-5]
                print("fn==",fn)
                if not fn:
                    print("False, Can't find out file name...")
                    return False, "Can't find out file name..."
                path = translate_path(self.path)
                print("path==",path)
    
                fname = fn
                #fname = fname.replace("\\", "\\\\")
                fname = self.str_to_chinese(fname)
                print("fname==",fname)
                
                fn = os.path.join(path, fname)
                print("SavefilePath==",fn)
                dirname = os.path.dirname(fn)
                if not os.path.exists(dirname):
                    os.makedirs(dirname)
                line = self.rfile.readline()
                remain_bytes -= len(line)
                line = self.rfile.readline()
                # b'\r\n'
                remain_bytes -= len(line)
                try:
                    out = open(fn, 'wb')
                except IOError:
                    return False, "Can't create file to write, do you have permission to write?"
    
                pre_line = self.rfile.readline()
                print("pre_line", pre_line)
                remain_bytes -= len(pre_line)
                print("remain_bytes", remain_bytes)
                Flag = True
                while remain_bytes > 0:
                    line = self.rfile.readline()
                    #print("while line", line)
                    
                    if boundary in line:
                        remain_bytes -= len(line)
                        pre_line = pre_line[0:-1]
                        if pre_line.endswith(b'\r'):
                            pre_line = pre_line[0:-1]
                        out.write(pre_line)
                        out.close()
                        #return True, "File '%s' upload success!" % fn
                        res.append("File '%s' upload success!" % fn)
                        Flag = False
                        break
                    else:
                        out.write(pre_line)
                        pre_line = line
                if pre_line is not None and Flag == True:
                    out.write(pre_line)
                    out.close()
                    res.append("File '%s' upload success!" % fn)
                #return False, "Unexpect Ends of data."
            return True, res
    
        def str_to_chinese(self,var):
            not_end = True
            while not_end:
                start1 = var.find("\\x")
                # print start1
                if start1 > -1:
                    str1 = var[start1 + 2:start1 + 4]
                    print(str1)
                    start2 = var[start1 + 4:].find("\\x") + start1 + 4
                    if start2 > -1:
                        str2 = var[start2 + 2:start2 + 4]
    
                        start3 = var[start2 + 4:].find("\\x") + start2 + 4
                        if start3 > -1:
                            str3 = var[start3 + 2:start3 + 4]
                else:
                    not_end = False
                if start1 > -1 and start2 > -1 and start3 > -1:
                    str_all = str1 + str2 + str3
                    # print str_all
                    str_all = codecs.decode(str_all, "hex").decode('utf-8')
    
                    str_re = var[start1:start3 + 4]
                    # print str_all, "   " ,str_re
                    var = var.replace(str_re, str_all)
            # print var.decode('utf-8')
            return var
    
    def translate_path(path):
        """Translate a /-separated PATH to the local filename syntax.
        Components that mean special things to the local file system
         (e.g. drive or directory names) are ignored.  (XXX They should
        probably be diagnosed.)
        """
        # abandon query parameters
        path = path.split('?', 1)[0]
        path = path.split('#', 1)[0]
        path = posixpath.normpath(unquote(path))
        words = path.split('/')
        words = filter(None, words)
        path = os.getcwd()
        for word in words:
            drive, word = os.path.splitdrive(word)
            head, word = os.path.split(word)
            if word in (os.curdir, os.pardir):
                continue
            path = os.path.join(path, word)
        return path
            
    if __name__ == '__main__':
        host = ('192.168.1.16', 18098)
        server = HTTPServer(host, Resquest)
        print("Starting server, listen at: %s:%s" % host)
        server.serve_forever()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175

    2 测试

    参见链接文章中的客户端测试方法进行测试。
    如有其他需求或问题,烦请评论区留言。
    Python 实现http server接收mutipart/form-data文件 方法1

  • 相关阅读:
    【Java】运算符
    基于SpringBoot+Vue的公园管理系统的详细设计和实现(源码+lw+部署文档+讲解等)
    21-SASS
    【Java】【设计模式】单例模式
    温度、机械振动等对电子产品的影响
    基于javaweb音乐网站管理系统
    Java学生管理系统
    关于promise讲解
    Spring Cloud Gateway整合Swagger聚合微服务系统API文档(非Zuul)
    Qt QCustomPlot 踩坑记录
  • 原文地址:https://blog.csdn.net/shengzhe8688/article/details/133888062