• python3 简易 http server:实现本地与远程服务器传大文件


    在个人目录下创建新文件httpserver.py

    vim httpserver.py
    
    • 1

    文件内容为python3代码:

    # !/usr/bin/env python3
    import datetime
    import email
    import html
    import http.server
    import io
    import mimetypes
    import os
    import posixpath
    import re
    import shutil
    import sys
    import urllib.error
    import urllib.parse
    import urllib.request
    from http import HTTPStatus
    
    __version__ = "0.1"
    __all__ = ["SimpleHTTPRequestHandler"]
    
    
    class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
        server_version = "SimpleHTTP/" + __version__
        extensions_map = _encodings_map_default = {
            '.gz': 'application/gzip',
            '.Z': 'application/octet-stream',
            '.bz2': 'application/x-bzip2',
            '.xz': 'application/x-xz',
        }
    
        def __init__(self, *args, directory=None, **kwargs):
            if directory is None:
                directory = os.getcwd()
            self.directory = os.fspath(directory)
            super().__init__(*args, **kwargs)
    
        def do_GET(self):
            f = self.send_head()
            if f:
                try:
                    self.copyfile(f, self.wfile)
                finally:
                    f.close()
    
        def do_HEAD(self):
            f = self.send_head()
            if f:
                f.close()
    
        def send_head(self):
            path = self.translate_path(self.path)
            f = None
            if os.path.isdir(path):
                parts = urllib.parse.urlsplit(self.path)
                if not parts.path.endswith('/'):
                    # redirect browser - doing basically what apache does
                    self.send_response(HTTPStatus.MOVED_PERMANENTLY)
                    new_parts = (parts[0], parts[1], parts[2] + '/',
                                 parts[3], parts[4])
                    new_url = urllib.parse.urlunsplit(new_parts)
                    self.send_header("Location", new_url)
                    self.end_headers()
                    return None
                for index in "index.html", "index.htm":
                    index = os.path.join(path, index)
                    if os.path.exists(index):
                        path = index
                        break
                else:
                    return self.list_directory(path)
    
            ctype = self.guess_type(path)
            if path.endswith("/"):
                self.send_error(HTTPStatus.NOT_FOUND, "File not found")
                return None
            try:
                f = open(path, 'rb')
            except OSError:
                self.send_error(HTTPStatus.NOT_FOUND, "File not found")
                return None
            try:
                fs = os.fstat(f.fileno())
                # Use browser cache if possible
                if ("If-Modified-Since" in self.headers
                        and "If-None-Match" not in self.headers):
                    # compare If-Modified-Since and time of last file modification
                    try:
                        ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
                    except (TypeError, IndexError, OverflowError, ValueError):
                        # ignore ill-formed values
                        pass
                    else:
                        if ims.tzinfo is None:
                            # obsolete format with no timezone, cf.
                            # https://tools.ietf.org/html/rfc7231#section-7.1.1.1
                            ims = ims.replace(tzinfo=datetime.timezone.utc)
                        if ims.tzinfo is datetime.timezone.utc:
                            # compare to UTC datetime of last modification
                            last_modif = datetime.datetime.fromtimestamp(
                                fs.st_mtime, datetime.timezone.utc)
                            # remove microseconds, like in If-Modified-Since
                            last_modif = last_modif.replace(microsecond=0)
    
                            if last_modif <= ims:
                                self.send_response(HTTPStatus.NOT_MODIFIED)
                                self.end_headers()
                                f.close()
                                return None
    
                self.send_response(HTTPStatus.OK)
                self.send_header("Content-type", ctype)
                self.send_header("Content-Length", str(fs[6]))
                self.send_header("Last-Modified",
                                 self.date_time_string(fs.st_mtime))
                self.end_headers()
                return f
            except:
                f.close()
                raise
    
        def list_directory(self, path):
            try:
                list_dir = os.listdir(path)
            except OSError:
                self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
                return None
            list_dir.sort(key=lambda a: a.lower())
            r = []
            try:
                display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
            except UnicodeDecodeError:
                display_path = urllib.parse.unquote(path)
            display_path = html.escape(display_path, quote=False)
            enc = sys.getfilesystemencoding()
    
            form = """
                

    文件上传

    \n
    \n \n \n
    \n"""
    title = 'Directory listing for %s' % display_path r.append(' '"http://www.w3.org/TR/html4/strict.dtd">') r.append('\n') r.append(' 'content="text/html; charset=%s">' % enc) r.append('%s\n' % title) r.append('%s\n

    %s

    '
    % (form, title)) r.append('
    \n
      ') for name in list_dir: fullname = os.path.join(path, name) displayname = linkname = name # Append / for directories or @ for symbolic links if os.path.isdir(fullname): displayname = name + "/" linkname = name + "/" if os.path.islink(fullname): displayname = name + "@" # Note: a link to a directory displays with @ and links with / r.append('
    • %s
    • '
      % (urllib.parse.quote(linkname, errors='surrogatepass'), html.escape(displayname, quote=False))) r.append('
    \n
    \n\n\n'
    ) encoded = '\n'.join(r).encode(enc, 'surrogate escape') f = io.BytesIO() f.write(encoded) f.seek(0) self.send_response(HTTPStatus.OK) self.send_header("Content-type", "text/html; charset=%s" % enc) self.send_header("Content-Length", str(len(encoded))) self.end_headers() return f def translate_path(self, path): # abandon query parameters path = path.split('?', 1)[0] path = path.split('#', 1)[0] # Don't forget explicit trailing slash when normalizing. Issue17324 trailing_slash = path.rstrip().endswith('/') try: path = urllib.parse.unquote(path, errors='surrogatepass') except UnicodeDecodeError: path = urllib.parse.unquote(path) path = posixpath.normpath(path) words = path.split('/') words = filter(None, words) path = self.directory for word in words: if os.path.dirname(word) or word in (os.curdir, os.pardir): # Ignore components that are not a simple file/directory name continue path = os.path.join(path, word) if trailing_slash: path += '/' return path def copyfile(self, source, outputfile): shutil.copyfileobj(source, outputfile) def guess_type(self, path): base, ext = posixpath.splitext(path) if ext in self.extensions_map: return self.extensions_map[ext] ext = ext.lower() if ext in self.extensions_map: return self.extensions_map[ext] guess, _ = mimetypes.guess_type(path) if guess: return guess return 'application/octet-stream' def do_POST(self): r, info = self.deal_post_data() self.log_message('%s, %s => %s' % (r, info, self.client_address)) enc = sys.getfilesystemencoding() res = [ ' '"http://www.w3.org/TR/html4/strict.dtd">', '\n', '' % enc, '%s\n' % "Upload Result Page", '

    %s

    \n'
    % "Upload Result" ] if r: res.append('

    SUCCESS: %s

    \n'
    % info) else: res.append('

    FAILURE: %s

    '
    % info) res.append('back' % self.headers['referer']) res.append('') encoded = '\n'.join(res).encode(enc, 'surrogate escape') f = io.BytesIO() f.write(encoded) length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Content-Length", str(length)) self.end_headers() if f: self.copyfile(f, self.wfile) f.close() def deal_post_data(self): content_type = self.headers['content-type'] if not content_type: return False, "Content-Type header doesn't contain boundary" boundary = content_type.split("=")[1].encode() remain_bytes = int(self.headers['content-length']) line = self.rfile.readline() remain_bytes -= len(line) if boundary not in line: return False, "Content NOT begin with boundary" line = self.rfile.readline() remain_bytes -= len(line) fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode()) if not fn: return False, "Can't find out file name..." path = self.translate_path(self.path) fn = os.path.join(path, fn[0]) line = self.rfile.readline() remain_bytes -= len(line) line = self.rfile.readline() remain_bytes -= len(line) try: out = open(fn, 'wb') except IOError: return False, "Can't create file to write, do you have permission to write?" preline = self.rfile.readline() remain_bytes -= len(preline) while remain_bytes > 0: line = self.rfile.readline() remain_bytes -= len(line) if boundary in line: preline = preline[0:-1] if preline.endswith(b'\r'): preline = preline[0:-1] out.write(preline) out.close() return True, "File '%s' upload success!" % fn else: out.write(preline) preline = line return False, "Unexpect Ends of data." if __name__ == '__main__': try: port = int(sys.argv[1]) except Exception: port = 8000 print('-------->> Warning: Port is not given, will use deafult port: 8000 ') print('-------->> if you want to use other port, please execute: ') print('-------->> python SimpleHTTPServerWithUpload.py port ') print("-------->> port is a integer and it's range: 1024 < port < 65535 ") http.server.test( HandlerClass=SimpleHTTPRequestHandler, ServerClass=http.server.HTTPServer, port=port )
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301

    在需要暴露的目录下启动http服务,如/data/codes/

    cd /data/codes/
    python3 httpserver.py 8888
    
    • 1
    • 2

    随后在个人电脑访问http://ip:8888即可浏览文件、上传文件:
    在这里插入图片描述

  • 相关阅读:
    LaTeX学习笔记
    刷题记录(NC13822 Keep In Line,NC16663 合并果子,NC16430 蚯蚓)
    Linux下Makefile操作
    【C语言进阶(14)】程序的编译与链接
    说说安腾处理器的双栈设计
    外包干了10个月,技术退步明显.......
    区块链是什么?
    使用IntelliJ IDEA本地启动调试Flink流计算工程的2个异常解决
    最佳实践 | 顶尖咨询公司常用的解决问题工具
    STL:stack和queue
  • 原文地址:https://blog.csdn.net/winter2121/article/details/132694290