• Thrift : Python RPC的实践


    远程调用的数据传输效率比较HTTP WEB API 要高,所以实践一下RPC的方案。

    Thrift协议的学习资料最好去github上 找专门的库,看examples就可以了(https://github.com/Thriftpy/thriftpy2)。

    我是在阿里云的服务器上进行的POC,遇到的几个坑,谨供大家参考。

    1、socket不通的问题
    2、binary类型传输的问题
    3、安全组设置问题

    其中,问题1-3已经解决 ,这个请见本文答案。

    相关的库:thriftpy或 thriftpy2

    一、方法一:用thriftpy库

    1、tf_service.thrift文件

    service Min_Bar_Service {
      string min_bar(1: i32 id,
                  2: string context)
                  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、Server:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import sys
    sys.path.append("/home/dataapi/.local/lib/python3.8/site-packages")
    sys.path.append('/home/dataapi/rpc_server/server/gen-py/tf_service')
     
    from thrift.protocol import TBinaryProtocol
    from thrift.server import THttpServer
    from  tf_service import Min_Bar_Service
    
     
    class Min_Bar_ServiceHandler:
        def min_bar(self, id, context):
            return context + ', id: ' + str(id)
    
    handler = Min_Bar_ServiceHandler()
    processor = Min_Bar_Service.Processor(handler)
    
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()
    server = THttpServer.THttpServer(processor,('127.0.0.1', 9090),pfactory)
    # def __init__(self,
    #                  processor,
    #                  server_address,
    #                  iprot_factory,
    #                  server_class=http_server.HTTPServer):
    # 关于THttpServer
    # https://github.com/Thriftpy/thriftpy/blob/0e606f82a3c900e663b63d69f68fc304c5d58dee/thriftpy/http.py
    print('Starting the server (THttpServer模式)...')
    server.serve()
    print('done.')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3、client

    import sys
    #sys.path.append('/home/songroom/thrift_client/gen-py')## 需要更改
    sys.path.append('/home/dataapi/rpc_server/server/gen-py/')
    from tf_service import Min_Bar_Service
    from thrift.transport import THttpClient
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    def get_http_uri(host,scheme,port,path):
        # host = 'xxx' scheme = 'http'  path = '/tf_service'  port = ':' + str(10001)
        # HTTP_URI = '{scheme}://{host}:{port}{path}'
        http_uri = "{}://{}:{}{}".format(scheme, host, port, path) # 'http://xxx:5000/tf_service'
        return http_uri
    
    # >>> get_http_uri("47.122.10.1","http",5000,"/tf_service")
    # 'http://49.112.10.1:5000/tf_service'
    
    host   = "127.0.0.1" #"48.123.10.1"
    scheme = "http"
    port   = "9090"
    path   = ""
    http_uri = get_http_uri(host,scheme,port,path)
    print(f"http_uri : {http_uri}")
    transport = THttpClient.THttpClient(http_uri)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client   = Min_Bar_Service.Client(protocol)
    
    # Connect!
    transport.open()
    
    id   = 42
    context = 'welcome to thrift rpc dataapi! '
    msg   = client.min_bar(id, context)
    print(msg)
    transport.close()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    4、需要在client和server端生成gen-py文件夹

    比如,我这里用ft_service.thrift文件定义了相关文件的规范,首先需要把这个文件在server端和client各存一份(或者说建一份),那么在server端和client端均需要:

    thrift -gen py tf_service.thrift
    
    • 1

    5、各启动server.py和client.py,即可了。

    二、方法二:thriftpy2

    thriftpy2是对thriftpy的一个封装,不需要gen-py这一步,代码更加简洁。方法大致差不多。
    但是.thrift文件在客户端和服务端还是一样需要,这个是数据交换类型的基础。
    一般来讲,现在用这个库比较方便,很丝滑,简洁。

    server:

    import thriftpy2
     
     
    class Min_Bar_ServiceHandler:
        def min_bar(self, id, context):
            return context + ', id: ' + str(id)
            
    tf_thrift = thriftpy2.load("tf_service.thrift", module_name="tf_service")
    from thriftpy2.rpc import make_server
    server = make_server(tf_service.Min_Bar_Service, Min_Bar_ServiceHandler(), '127.0.0.1', 9090)
    server.serve()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    client:

    import thriftpy2
    tf_thrift = thriftpy2.load("tf_service.thrift", module_name="tf_service")
    
    from thriftpy2.rpc import make_client
    
    client = make_client(tf_service.Min_Bar_Service, '127.0.0.1', 9090)
    id   = 42
    context = 'welcome to thrift rpc dataapi! '
    msg   = client.min_bar(id, context)
    print(msg)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三、问题

    1、如何能将这个象web api 一样开放给web用户?只需要修改一下host = “127.0.0.1” #"48.123.10.1"吗?
    2、这个性能和http 协议的web api相比,性能上有多少提升?

    关于问题1: 网络设置。
    在server端,在生产环境中,host设置为“0.0.0.0”,这样就可以监听所有的客户端口,而不是设置成服务端的ip““48.123.10.1”,也不是设置成“‘127.0.0.1’”。这个要特别注意,否则你会感觉到你的socket是一直不通的,尽管你可能做了各种安全组的入口和出口的设置 ,以及防火墙的设置确认。

    关于问题2:网络性能
    我POC在阿里云服务器,经过http->thrift rpc处理后,客户端取数的速度快了20%左右;服务端总体一样。

    数据:阿里云服务器POC带宽是2M,取数14100行,14列的数据,需要近2秒左右;原来rest api方案大约需要2秒+;
    说明:都采用pickle+gzip(后端二进制后压缩)、clickhouse(后台取数)方式,同样的服务器上;

    总结一下:对性能各种POC(gzip、zlib等,不同压缩等级,分块压缩),初步结论是,性能取决3块,(1)带宽;文件越大,这个越重要;(2)文件压缩;(3)后台取数。
    不考虑带宽的情况,后台部分,文件压缩会占50-80%时间开销;数据库取数据还好,10-20%左右开销。

    服务端实测数据:

    data_list length:  14100
    response_mode : unstream
    fetch min data  : 14100 
    ---------------------------------------cost time analysis---------------------------------------
    
                                    reqeust -> get_params[t1-t0]  cost time : 0.002 seconds! percent : 0.550 %
                              fetch data from clickhouse [t2-t1]  cost time : 0.096 seconds! percent : 28.430 %
                                           list -> _dict [t7-t3]  cost time : 0.041 seconds! percent : 12.012 %
                                   dict -> pickle binary [t8-t7]  cost time : 0.008 seconds! percent : 2.445 %
                              pickle binary ->  compress [t9-t8]  cost time : 0.192 seconds! percent : 56.547 %
                                                           other  cost time : 0.000 seconds! percent : 0.016 %
                                                           total  cost time : 0.339 seconds! percent : 100.000 %
    -------------------------------------cost time analysis------------------------------------------
    
                                   data_list from clickhouse [0]  size :          127000      bytes!
                                                       _dict [3]  size :             640      bytes!
                                   dict -> pickle data_binary[4]  size :         2101853      bytes!
    ---------------------------------------[fetch min data end]---------------------------------------------
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    也尝次过各种压缩、序列后方案,没法在这个基础上再进行提升了。

    附 rpc call 实例:

    call rpc rsp _msg undecompress -> size : 695690 bytes! 
    call rpc rsp msg decompress -> size : 1471526 bytes! 
    _msg_decompress pickle type :<class 'pandas.core.frame.DataFrame'> df.shape (12900, 14)
    call req cost : 1.9182627201080322 seconds
          code       open      close        low       high         volume         money    factor  hight_limit  low_limit        avg  pre_close    paused  open_interest
    0  1000001  11.504129  11.885328  11.193967  11.747972  110000.061648  1.100001e+06  1.834888    11.124524  11.819673  11.995430  11.165699  1.889763      11.624358
    1  1000001  11.263888  11.220592  11.271315  11.488682  110000.799924  1.100001e+06  1.557382    11.413308  11.637640  11.808874  11.039294  1.483113      11.192772
    2  1000001  11.980175  11.307286  11.389151  11.714876  110000.735357  1.100000e+06  1.859282    11.323002  11.218972  11.185324  11.438151  1.941566      11.591046
    3  1000001  11.728164  11.533206  11.356698  11.460819  110000.920400  1.100000e+06  1.665206    11.368569  11.093051  11.523135  11.038568  1.188286      11.983874
    4  1000001  11.019227  11.173405  11.490653  11.934057  110000.188601  1.100000e+06  1.808952    11.413177  11.658708  11.419425  11.821609  1.506192      11.890046
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    四、生产环境

    只能说,网络真的很重要,一不小心,掉里面不容易出来,看了各种功略都无果,只能求救阿里云小哥。后面得到解决,谢谢!

    1、阿里云网络安全组设置
    在这里插入图片描述据说,binary要开ICMP,但还没有确认。

    2、下面是模仿jqdatasdk中关于.thirft文件,加了一个sayhello.在此名字叫data_service.thrift吧

    enum DataType {
        Serial = 1,
        DataFrame = 2,
        Panel = 3
    }
    
    struct St_Query_Rsp {
        1:required bool status,
        2:optional string msg,
        5:optional string error
    }
    struct St_Query_Req {
        1:required string method_name,
        2:required binary params,
    }
    
    service DataService {
        St_Query_Rsp query(1:St_Query_Req req),
        St_Query_Rsp auth(1:string username, 2:string password, 5:bool compress, 8:string mac, 10:string version),
        St_Query_Rsp auth_by_token(1: string token)
        string say_hello(1: i32 id,2: string name)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3、server端POC代码:

    import thriftpy2
    from thriftpy2.rpc import make_server
    import json
    
    from thriftpy2.transport.buffered import TBufferedTransportFactory
    from thriftpy2.transport.framed import TFramedTransportFactory
    
    
    data_thrift = thriftpy2.load("data_service.thrift", module_name="data_thrift")
    # struct St_Query_Rsp {
    #     1:required bool status,
    #     2:optional string msg,
    #     5:optional string error
    # }
    # struct St_Query_Req {
    #     1:required string method_name,# get_price
    #     2:required binary params, #可以按pickle格式
    # }
    from  data_thrift import St_Query_Rsp
    _accounts = {"test01":8888,"test02":6666}
    
    class Data_ServiceHandler:
        def query(self, req):
            msg = {
                'city': 'shanghai',
                'population': 25000000,
                'female': 13000000,
                'male':12000000
            }
            print("query------------------>")
            json_msg = json.dumps(msg)
            if req.method_name =="fetch_minute_data":
                return St_Query_Rsp(status=True,msg = json_msg,error ="none")
            else:
                return  St_Query_Rsp(status=True,msg = "everything is bad",error ="method is wrong!")
        def auth(self,username,password):
            print("auth----------------->")
            if username in _accounts.keys() and password ==_account[username]:
                tok = "i_am_right_token"
                status = True
                msg = tok
                error = ""
            else:
                tok = "i_am_bad_token"
                status = False
                msg = tok
                error ="username is in accounts,or password is not right!"
            return St_Query_Rsp(status=status,msg = msg,error =error)
    
        def auth_by_token(self,token): ## tobefixed
            print("auth------------------->")
            if token !="":
                tok = "i_am_right_token"
                status = True
                msg = tok
                error = ""
            else:
                tok = "i_am_bad_token"
                status = False
                msg = tok
                error ="username is in accounts,or password is not right!"
            return St_Query_Rsp(status=status,msg = msg,error =error)
    
        def say_hello(self,id,name):
            return f'id : {id}, name : {name} hello! welcome data api! '
    
    # St_Query_Rsp query(1:St_Query_Req req),
    # St_Query_Rsp auth(1:string username, 2:string password, 5:bool compress, 8:string mac, 10:string version),
    # St_Query_Rsp auth_by_token(1: string token)  
    print("启动thrift server..........")
    port = 9090
    host = '0.0.0.0'
    server = make_server(data_thrift.DataService, Data_ServiceHandler(), host, port,client_timeout=5000)
    server.serve()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    4、client端

    import sys
    #sys.path.append('/home/songroom/thrift_client/gen-py')## 需要更改
    #sys.path.append('/home/dataapi/rpc_server/server/gen-py/')
    import thriftpy2
    from thriftpy2.rpc import make_client
    import pickle
    import msgpack
    import json
    from thriftpy2.protocol import binary as proto
    from thriftpy2.transport.buffered import TBufferedTransportFactory
    from thriftpy2.transport.framed import TFramedTransportFactory
    
    
    def get_http_uri(host,scheme,port,path):
        # host = 'xxx' scheme = 'http'  path = '/tf_service'  port = ':' + str(10001)
        # HTTP_URI = '{scheme}://{host}:{port}{path}'
        http_uri = "{}://{}:{}{}".format(scheme, host, port, path) # 'http://xxx:5000/tf_service'
        return http_uri
    
    host   = "48.123.10.1"
    scheme = "http"
    port   = 9090
    path   = ""
    http_uri = get_http_uri(host,scheme,port,path)
    print(f"http_uri : {http_uri}")
    
    client_thrift = thriftpy2.load("data_service.thrift", module_name="client_thrift")
    client = make_client(client_thrift.DataService, host, port,timeout=6000)
    
    # def make_client(service, host="localhost", port=9090, unix_socket=None,
    #                 proto_factory=TBinaryProtocolFactory(),
    #                 trans_factory=TBufferedTransportFactory(),
    #                 timeout=3000, cafile=None, ssl_context=None, certfile=None,
    #                 keyfile=None, url="", socket_family=socket.AF_INET):
    
    
    import client_thrift
    print("call rpc: say_hello() ---->")
    id   = 42
    name = 'wowotuo!'
    rpc_msg  = client.say_hello(id, name)
    print(f"say hello: get rpc_msg {rpc_msg} ")
    print("call rpc: query() ---->")
    params = {
        'city': 'shanghai',
        'population': 25000000,
        'female': 13000000,
        'male':12000000
    }
    
    _pickle = pickle.dumps(params); #msgpack 
    _msgpack = msgpack.packb(params)
    # 可能是因为thrift 的server端和client端的协议不匹配造成的
    print(f"_pickle type : {type(_pickle)}")
    print(f"_msgpack type :{type(_msgpack)}")
    req = client_thrift.St_Query_Req()
    req.method_name = "get_price"
    req.params = _pickle #msgpack.packb(params) # 打包成二进制类型
    print(f"req : {req}")
    rpc_rsp = client.query(req)
    print(f"call rpc req ->rsp msg : {rpc_rsp.msg} ")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    5、IDL中binary类型的传输,和压缩问题

    目前,可以考虑用msgpack或pickle生成bytes类型,这样符合binary类型。总体上看,我认为pickle比msgpack方式略好一点。
    在binary后,进行压缩用的是gzip(用6就可以),也比较了zlib,总体上差不多,我倾上于gzip。压缩后,binary的大小会更小一些,更有利于网络传输。
    如果要更精细化一点:在客户端的一些类dict和json格式的参数,用msgpack可能会更优一点;大一点的数据在server端可以优先考虑用pickle.

    # 可以不用json方式
    class PdEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, pd.Timestamp):
                return str(obj)
            return json.JSONEncoder.default(self, obj)
    
    def decode_datetime(obj):
        if b'__datetime__' in obj:
            obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
        return obj
    
    def encode_datetime(obj):
        if isinstance(obj, datetime):
            obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
        return obj
    
    # 具体用法
    # msg_binary = pickle.dumps(df) #  msgpack.packb(dict_data, default=encode_datetime) # msgpack压缩
    # this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)# msgpack解压
    # msg_binary = pickle.dumps(df) #我是直接对df->pandas.DataFrame格式进行压缩, df格式比dict,list效果略好.
    
    # 另外,在binary的基础上,还可以进行gzip压缩小,这样传输的大小会小,效果会更好一点。
    
    需要说明的是,目前没有比较pandas与DataTable格式的区别。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    五、输出

    root@DESKTOP-MEDPUTU:/home/songroom/thrift_client# python client.py
    http_uri : http://48.123.10.1:9090
    call rpc: say_hello() ---->
    say hello: get rpc_msg id : 42, name : wowotuo! hello! welcome data api!  
    call rpc: query() ---->
    _pickle type : <class 'bytes'>
    _msgpack type :<class 'bytes'>
    req : St_Query_Req(method_name='get_price', params=b'\x80\x04\x95C\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x04city\x94\x8c\x08shanghai\x94\x8c\npopulation\x94J@x}\x01\x8c\x06female\x94J@]\xc6\x00\x8c\x04male\x94J\x00\x1b\xb7\x00u.')
    call rpc req ->rsp msg : everything is bad 
    root@DESKTOP-MEDPUTU:/home/songroom/thrift_client# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    创建javaEE项目(无maven),JSP(九大内置对象)、Servlet(生命周期)了解
    Qt开发_调用OpenCV(3.4.7)设计完成人脸检测系统
    CSS3笔记
    腾讯样式初始化文件
    java计算机毕业设计火车订票系统MyBatis+系统+LW文档+源码+调试部署
    Istio(十三):Istio项目实际案例——Online Boutique
    2022款华硕灵耀pro16和华硕proart创16区别哪个好哪个更值得入手
    配置oem 13c以监控数据库
    Zookeeper集群 + Kafka集群
    【零基础入门SpringMVC】第六期——尾声
  • 原文地址:https://blog.csdn.net/wowotuo/article/details/126452503