• 实现http流式输出的最小实践


    engine端

    1. import asyncio
    2. class AsyncStream:
    3. #实现了__aiter__和__anext__表示这是一个异步队列,而不是同步队列,所以不能用for _ in 访问
    4. #内部元素,只能用async for _ in 访问
    5. def __init__(self, request_id: str) -> None:
    6. self.request_id = request_id
    7. self._queue = asyncio.Queue()
    8. self._finished = False
    9. def put(self, item) -> None:
    10. if self._finished:
    11. return
    12. self._queue.put_nowait(item)
    13. def finish(self) -> None:
    14. self._queue.put_nowait(StopIteration)
    15. self._finished = True
    16. @property
    17. def finished(self) -> bool:
    18. return self._finished
    19. def __aiter__(self):
    20. return self
    21. async def __anext__(self):
    22. result = await self._queue.get()
    23. if result is StopIteration:
    24. raise StopAsyncIteration
    25. elif isinstance(result, Exception):
    26. raise result
    27. return result
    28. class Engine():
    29. def __init__(self):
    30. self.items = ['my', 'name', 'is', 'lewis']
    31. self.stream = AsyncStream()
    32. async def generate(self):
    33. #这是一个异步生成器
    34. async for request_output in self.stream:
    35. print(request_output)
    36. yield request_output
    37. def get_engine():
    38. return Engine()

    server端

    1. import argparse
    2. import json
    3. from typing import AsyncGenerator
    4. from fastapi import FastAPI, Request
    5. from fastapi.responses import JSONResponse, Response, StreamingResponse
    6. import uvicorn
    7. from api_engine import get_engine
    8. TIMEOUT_KEEP_ALIVE = 5 # seconds.
    9. TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds.
    10. app = FastAPI()
    11. engine = get_engine()
    12. @app.post("/generate")
    13. async def generate(request: Request) -> Response:
    14. for item in engine.items:
    15. print('push:', item)
    16. engine.stream.put(item)
    17. results_generator = engine.generate()
    18. # Streaming case
    19. async def stream_results() -> AsyncGenerator[bytes, None]:
    20. async for request_output in results_generator:
    21. ret = {"text": request_output}
    22. yield (json.dumps(ret) + "\0").encode("utf-8")
    23. return StreamingResponse(stream_results())
    24. if __name__ == "__main__":
    25. parser = argparse.ArgumentParser()
    26. parser.add_argument("--port", type=int, default=8000)
    27. args = parser.parse_args()
    28. uvicorn.run(app,
    29. host='localhost',
    30. port=args.port,
    31. log_level="debug",
    32. timeout_keep_alive=TIMEOUT_KEEP_ALIVE)

    client端

    1. import requests
    2. import json
    3. from typing import Iterable, List
    4. def get_streaming_response(response: requests.Response) -> Iterable[List[str]]:
    5. for chunk in response.iter_lines(chunk_size=8192,
    6. decode_unicode=False,
    7. delimiter=b"\0"):
    8. if chunk:
    9. data = json.loads(chunk.decode("utf-8"))
    10. output = data["text"]
    11. yield output
    12. def post_http_request(
    13. api_url: str,
    14. ) -> requests.Response:
    15. headers = {"User-Agent": "Test Client"}
    16. response = requests.post(api_url, headers=headers, stream=True)
    17. return response
    18. if __name__ == "__main__":
    19. api_url = f"http://localhost:8000/generate"
    20. response = post_http_request(api_url)
    21. num_printed_lines = 0
    22. for h in get_streaming_response(response):
    23. print(h)

  • 相关阅读:
    Kyligence李栋:从数据湖到指标中台,提升数据分析ROI
    计算机二级access
    instanceclient安装与配置
    seata相关图形,dljd,cat
    三、工厂方法模式
    String字符串方法
    机器学习-K近邻算法
    第3章 Thymeleaf模板渲染
    K_A08_005 基于 STM32等单片机驱动XY-160D模块按键控制直流电机正反转加减速启停
    ms office学习记录10:Excel㈣
  • 原文地址:https://blog.csdn.net/zhuikefeng/article/details/134052626