• Flask——基于python完整实现客户端和服务器后端流式请求及响应


    看了很多相关博客,但是都没有本地客户端和服务器后端的完整代码示例,有的也只说了如何流式获取后端结果,基本没有讲两端如何同时实现流式输入输出,特此整理总结,给大家交流学习和使用!

    本地客户端

    • requests.post得到流式回复的重要参数:
      • stream:需要设置为True;
      • response.iter_content:使用该函数接收返回的流式数据。
    import requests
    import time
    import json
    
    def generate_stream_data():
        # 假设这是要发送的文本列表
        is_end = False
        lines = ["Hello", "world", "this", "is", "a", "stream", "of", "text"]
        for line in lines:
            print(line)
            if lines.index(line) == len(lines) - 1:
                is_end = True
            yield json.dumps({'line': line, 'is_end': is_end}) + '\n'
            time.sleep(0.5)
            # 模拟数据处理时间
    
    def get_stream_response(response):
        # 流式接收response
        rec_data_list = []
        temp_data = ''
        for chunk in response.iter_content(chunk_size=1):
            temp_data += chunk.decode('utf-8')
            if temp_data.endswith('\n'):
                temp_json = json.loads(temp_data)
                rec_data_list.append(temp_json)
                print(temp_data)
                temp_data = ''
                if temp_json['is_end']:
                    break
        print(rec_data_list)
        print("----------------------------")
        print(temp_data)
        return rec_data_list
    
    def stream_upload(url):
        
        # 流式接收response
        response = requests.post(url, data=generate_stream_data(), stream=True)
        
        final_response = get_stream_response(response)
        
        return final_response
    
    url = 'http://127.0.0.1:5000/stream'
    response = stream_upload(url)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    Flask服务器后端

    • flask.request流式获取数据::
      • 使用request.stream.read读取数据,而不是get_data()等一次性函数。
    from flask import Flask, Response, request
    import time
    import json
    import requests
    
    app = Flask(__name__)
    
    def process_stream_data(stream_data):
        # 假设这是要发送的数据
        print("开始生成新的数据流")
        is_end = False
        print(stream_data)
        for idx, line in enumerate(stream_data):
            if idx == len(stream_data)-1:
                is_end = True
            print(line)
            yield json.dumps(line)+"\n"
            time.sleep(0.5)
            # 模拟数据处理时间
    
    def get_stream_request(chunk_size=1):
        req_data_list = []
        temp_data = ''
        while True:
            chunk = request.stream.read(chunk_size)
            temp_data += chunk.decode('utf-8')
            if temp_data.endswith('\n'):
                temp_json = json.loads(temp_data)
                req_data_list.append(temp_json)
                print(temp_data)
                temp_data = ''
                if temp_json['is_end']:
                    return req_data_list
    
    @app.route('/stream', methods=['POST'])
    def stream_text():
        
        data = get_stream_request()
    
        print("----------------------------")
        
        return Response(process_stream_data(data))
    
    if __name__ == "__main__":
        app.run(host='0.0.0.0', port=5000, debug=True)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    客户端/服务器端流式接收[打字机]效果

    请添加图片描述

  • 相关阅读:
    晋级名单揭晓,中秋&国庆双节喜迎“梧桐杯”省级决赛!
    【C++】C++ 引用详解 ⑥ ( 普通变量 / 一级指针 / 二级指针 做函数参数的作用 )
    653. 钞票
    中外联合培养工商管理博士|社科大新加坡社科大学中文授课DBA
    Selenium之入门
    接口自动化测试注意事项
    CentOS 升级git
    web项目的搭建
    【快速阅读三】使用泊松融合实现单幅图的无缝拼贴及消除两幅图片直接的拼接缝隙。
    当个 PM 式程序员「GitHub 热点速览」
  • 原文地址:https://blog.csdn.net/qq_45779334/article/details/136226764