• Github: ksrpc原码解读---HTTP异步文件交互


    在ksrpc库中,采用了异步文件交互的方式,值得一看。

    https://github.com/wukan1986/ksrpc/blob/main/ksrpc/connections/http.py
    
    • 1
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author     :wukan
    # @License    :(C) Copyright 2022, wukan
    # @Date       :2022-05-15
    
    """
    HTTP客户端
    默认提交pickle格式,返回也是pickle格式
    1. RequestsConnection,使用requests实现。原本只支持同步,但此处已经改成支持异步
    2. HttpxConnection,使用httpx实现。使用异步,但也可以指定成同步使用
    """
    from io import BytesIO
    from urllib.parse import urlparse
    
    import pandas as pd
    
    from ..model import Format
    from ..serializer.json_ import dict_to_obj
    from ..serializer.pkl_gzip import deserialize, serialize
    from ..utils.async_ import to_sync
    
    
    def process_response(r):
        """处理HTTP响应。根据不同的响应分别处理
        Parameters
        ----------
        r: Response
        Returns
        -------
        object
        json
        csv
        """
        if r.status_code != 200:
            raise Exception(f'{r.status_code}, {r.text}')
        content_type = r.headers['content-type']
        if content_type.startswith('application/octet-stream'):
            data = deserialize(r.content)
            if data['status'] == 200:
                return data['data']
            return data
        elif content_type.startswith('application/json'):
            data = r.json()
            if data.get('type', None) in ('DataFrame', 'Series', 'ndarray'):
                return dict_to_obj(data['data'])
            if data['status'] == 200:
                return data['data']
            return data
        elif content_type.startswith('text/plain'):
            # 纯文本,表示返回的CSV格式
            return pd.read_csv(BytesIO(r.content), index_col=0)
    
    
    class HttpxConnection:
        """使用httpx实现的客户端连接支持同步和异步"""
        # 超时,请求超求和响应超时,秒
        timeout = (5, 30)
    
        def __init__(self, url, token=None):
            import httpx
    
            path = urlparse(url).path
            assert path.endswith(('/get', '/post', '/file')), 'Python语言优先使用file,其它语言使用post'
            if path.endswith('/file'):
                self._fmt = Format.PKL_GZIP
            else:
                self._fmt = Format.JSON
    
            self._url = url
            self._token = token
            self._client = httpx.AsyncClient()
    
        async def __aenter__(self):
            """异步async with"""
            await self._client.__aenter__()
            return self
    
        async def __aexit__(self, exc_type=None, exc_value=None, traceback=None):
            """异步async with"""
            await self._client.__aexit__(exc_type, exc_value, traceback)
    
        def __enter__(self):
            """同步with"""
            to_sync(self._client.__aenter__)()
            return self
    
        def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
            """同步with"""
            to_sync(self._client.__aexit__)()
    
        async def call(self, func, args, kwargs,
                       fmt: Format = Format.PKL_GZIP,
                       cache_get: bool = True, cache_expire: int = 3600,
                       async_remote=True):
            """调用函数
            Parameters
            ----------
            func: str
                函数全名
            args: tuple
                函数位置参数
            kwargs: dict
                函数命名参数
            fmt: Format
                指定响应格式
            cache_get: bool
                是否优先从缓存中获取
            cache_expire: int
                指定缓存超时。超时此时间将过期,指定0表示不进行缓存
            async_remote: bool
                异步方式调用
            """
            # 如果是JSON格式可以考虑返回CSV
            rsp_fmt = self._fmt
    
            # httpx解析枚举有问题,只能提前转成value,而requests没有此问题
            params = dict(func=func, fmt=rsp_fmt.value,
                          cache_get=cache_get, cache_expire=cache_expire, async_remote=async_remote)
            data = {'args': args, 'kwargs': kwargs}
            headers = None if self._token is None else {"Authorization": f"Bearer {self._token}"}
    
            if self._fmt == Format.PKL_GZIP:
                files = {"file": serialize(data).read()}
                r = await self._client.post(self._url, headers=headers, params=params, timeout=self.timeout, files=files)
            elif self._fmt == Format.JSON:
                r = await self._client.post(self._url, headers=headers, params=params, timeout=self.timeout, json=data)
    
            return process_response(r)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
  • 相关阅读:
    indexOf()与incldes()的异同 -- js随记
    HIve数仓新零售项目DWB层的构建
    技术分享| Etcd如何实现分布式负载均衡及分布式通知与协调
    tf+keras-pytorch-3.7-cpu环境
    Tuxera NTFS 2022 for Mac破解版百度网盘免费下载安装激活教程
    Neural Sewing Machine (NSM)
    使用 htmx 构建交互式 Web 应用
    【Redis学习笔记】第七章 Redis事务
    编译原理—语法制导翻译、S属性、L属性、自上而下、自下而上计算
    神经网络的三种训练方法,神经网络训练速度
  • 原文地址:https://blog.csdn.net/wowotuo/article/details/126663259