远程调用的数据传输效率比较HTTP WEB API 要高,所以实践一下RPC的方案。
Thrift协议的学习资料最好去github上 找专门的库,看examples就可以了(https://github.com/Thriftpy/thriftpy2)。
我是在阿里云的服务器上进行的POC,遇到的几个坑,谨供大家参考。
1、socket不通的问题
2、binary类型传输的问题
3、安全组设置问题
其中,问题1-3已经解决 ,这个请见本文答案。
相关的库:thriftpy或 thriftpy2
一、方法一:用thriftpy库
1、tf_service.thrift文件
service Min_Bar_Service {
string min_bar(1: i32 id,
2: string context)
}
2、Server:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
sys.path.append("/home/dataapi/.local/lib/python3.8/site-packages")
sys.path.append('/home/dataapi/rpc_server/server/gen-py/tf_service')
from thrift.protocol import TBinaryProtocol
from thrift.server import THttpServer
from tf_service import Min_Bar_Service
class Min_Bar_ServiceHandler:
def min_bar(self, id, context):
return context + ', id: ' + str(id)
handler = Min_Bar_ServiceHandler()
processor = Min_Bar_Service.Processor(handler)
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = THttpServer.THttpServer(processor,('127.0.0.1', 9090),pfactory)
# def __init__(self,
# processor,
# server_address,
# iprot_factory,
# server_class=http_server.HTTPServer):
# 关于THttpServer
# https://github.com/Thriftpy/thriftpy/blob/0e606f82a3c900e663b63d69f68fc304c5d58dee/thriftpy/http.py
print('Starting the server (THttpServer模式)...')
server.serve()
print('done.')
3、client
import sys
#sys.path.append('/home/songroom/thrift_client/gen-py')## 需要更改
sys.path.append('/home/dataapi/rpc_server/server/gen-py/')
from tf_service import Min_Bar_Service
from thrift.transport import THttpClient
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
def get_http_uri(host,scheme,port,path):
# host = 'xxx' scheme = 'http' path = '/tf_service' port = ':' + str(10001)
# HTTP_URI = '{scheme}://{host}:{port}{path}'
http_uri = "{}://{}:{}{}".format(scheme, host, port, path) # 'http://xxx:5000/tf_service'
return http_uri
# >>> get_http_uri("47.122.10.1","http",5000,"/tf_service")
# 'http://49.112.10.1:5000/tf_service'
host = "127.0.0.1" #"48.123.10.1"
scheme = "http"
port = "9090"
path = ""
http_uri = get_http_uri(host,scheme,port,path)
print(f"http_uri : {http_uri}")
transport = THttpClient.THttpClient(http_uri)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Min_Bar_Service.Client(protocol)
# Connect!
transport.open()
id = 42
context = 'welcome to thrift rpc dataapi! '
msg = client.min_bar(id, context)
print(msg)
transport.close()
4、需要在client和server端生成gen-py文件夹
比如,我这里用ft_service.thrift文件定义了相关文件的规范,首先需要把这个文件在server端和client各存一份(或者说建一份),那么在server端和client端均需要:
thrift -gen py tf_service.thrift
5、各启动server.py和client.py,即可了。
二、方法二:thriftpy2
thriftpy2是对thriftpy的一个封装,不需要gen-py这一步,代码更加简洁。方法大致差不多。
但是.thrift文件在客户端和服务端还是一样需要,这个是数据交换类型的基础。
一般来讲,现在用这个库比较方便,很丝滑,简洁。
server:
import thriftpy2
class Min_Bar_ServiceHandler:
def min_bar(self, id, context):
return context + ', id: ' + str(id)
tf_thrift = thriftpy2.load("tf_service.thrift", module_name="tf_service")
from thriftpy2.rpc import make_server
server = make_server(tf_service.Min_Bar_Service, Min_Bar_ServiceHandler(), '127.0.0.1', 9090)
server.serve()
client:
import thriftpy2
tf_thrift = thriftpy2.load("tf_service.thrift", module_name="tf_service")
from thriftpy2.rpc import make_client
client = make_client(tf_service.Min_Bar_Service, '127.0.0.1', 9090)
id = 42
context = 'welcome to thrift rpc dataapi! '
msg = client.min_bar(id, context)
print(msg)
三、问题
1、如何能将这个象web api 一样开放给web用户?只需要修改一下host = “127.0.0.1” #"48.123.10.1"吗?
2、这个性能和http 协议的web api相比,性能上有多少提升?
关于问题1: 网络设置。
在server端,在生产环境中,host设置为“0.0.0.0”,这样就可以监听所有的客户端口,而不是设置成服务端的ip““48.123.10.1”,也不是设置成“‘127.0.0.1’”。这个要特别注意,否则你会感觉到你的socket是一直不通的,尽管你可能做了各种安全组的入口和出口的设置 ,以及防火墙的设置确认。
关于问题2:网络性能
我POC在阿里云服务器,经过http->thrift rpc处理后,客户端取数的速度快了20%左右;服务端总体一样。
数据:阿里云服务器POC带宽是2M,取数14100行,14列的数据,需要近2秒左右;原来rest api方案大约需要2秒+;
说明:都采用pickle+gzip(后端二进制后压缩)、clickhouse(后台取数)方式,同样的服务器上;
总结一下:对性能各种POC(gzip、zlib等,不同压缩等级,分块压缩),初步结论是,性能取决3块,(1)带宽;文件越大,这个越重要;(2)文件压缩;(3)后台取数。
不考虑带宽的情况,后台部分,文件压缩会占50-80%时间开销;数据库取数据还好,10-20%左右开销。
服务端实测数据:
data_list length: 14100
response_mode : unstream
fetch min data : 14100
---------------------------------------cost time analysis---------------------------------------
reqeust -> get_params[t1-t0] cost time : 0.002 seconds! percent : 0.550 %
fetch data from clickhouse [t2-t1] cost time : 0.096 seconds! percent : 28.430 %
list -> _dict [t7-t3] cost time : 0.041 seconds! percent : 12.012 %
dict -> pickle binary [t8-t7] cost time : 0.008 seconds! percent : 2.445 %
pickle binary -> compress [t9-t8] cost time : 0.192 seconds! percent : 56.547 %
other cost time : 0.000 seconds! percent : 0.016 %
total cost time : 0.339 seconds! percent : 100.000 %
-------------------------------------cost time analysis------------------------------------------
data_list from clickhouse [0] size : 127000 bytes!
_dict [3] size : 640 bytes!
dict -> pickle data_binary[4] size : 2101853 bytes!
---------------------------------------[fetch min data end]---------------------------------------------
也尝次过各种压缩、序列后方案,没法在这个基础上再进行提升了。
附 rpc call 实例:
call rpc rsp _msg undecompress -> size : 695690 bytes!
call rpc rsp msg decompress -> size : 1471526 bytes!
_msg_decompress pickle type :<class 'pandas.core.frame.DataFrame'> df.shape (12900, 14)
call req cost : 1.9182627201080322 seconds
code open close low high volume money factor hight_limit low_limit avg pre_close paused open_interest
0 1000001 11.504129 11.885328 11.193967 11.747972 110000.061648 1.100001e+06 1.834888 11.124524 11.819673 11.995430 11.165699 1.889763 11.624358
1 1000001 11.263888 11.220592 11.271315 11.488682 110000.799924 1.100001e+06 1.557382 11.413308 11.637640 11.808874 11.039294 1.483113 11.192772
2 1000001 11.980175 11.307286 11.389151 11.714876 110000.735357 1.100000e+06 1.859282 11.323002 11.218972 11.185324 11.438151 1.941566 11.591046
3 1000001 11.728164 11.533206 11.356698 11.460819 110000.920400 1.100000e+06 1.665206 11.368569 11.093051 11.523135 11.038568 1.188286 11.983874
4 1000001 11.019227 11.173405 11.490653 11.934057 110000.188601 1.100000e+06 1.808952 11.413177 11.658708 11.419425 11.821609 1.506192 11.890046
四、生产环境
只能说,网络真的很重要,一不小心,掉里面不容易出来,看了各种功略都无果,只能求救阿里云小哥。后面得到解决,谢谢!
1、阿里云网络安全组设置
据说,binary要开ICMP,但还没有确认。
2、下面是模仿jqdatasdk中关于.thirft文件,加了一个sayhello.在此名字叫data_service.thrift吧
enum DataType {
Serial = 1,
DataFrame = 2,
Panel = 3
}
struct St_Query_Rsp {
1:required bool status,
2:optional string msg,
5:optional string error
}
struct St_Query_Req {
1:required string method_name,
2:required binary params,
}
service DataService {
St_Query_Rsp query(1:St_Query_Req req),
St_Query_Rsp auth(1:string username, 2:string password, 5:bool compress, 8:string mac, 10:string version),
St_Query_Rsp auth_by_token(1: string token)
string say_hello(1: i32 id,2: string name)
}
3、server端POC代码:
import thriftpy2
from thriftpy2.rpc import make_server
import json
from thriftpy2.transport.buffered import TBufferedTransportFactory
from thriftpy2.transport.framed import TFramedTransportFactory
data_thrift = thriftpy2.load("data_service.thrift", module_name="data_thrift")
# struct St_Query_Rsp {
# 1:required bool status,
# 2:optional string msg,
# 5:optional string error
# }
# struct St_Query_Req {
# 1:required string method_name,# get_price
# 2:required binary params, #可以按pickle格式
# }
from data_thrift import St_Query_Rsp
_accounts = {"test01":8888,"test02":6666}
class Data_ServiceHandler:
def query(self, req):
msg = {
'city': 'shanghai',
'population': 25000000,
'female': 13000000,
'male':12000000
}
print("query------------------>")
json_msg = json.dumps(msg)
if req.method_name =="fetch_minute_data":
return St_Query_Rsp(status=True,msg = json_msg,error ="none")
else:
return St_Query_Rsp(status=True,msg = "everything is bad",error ="method is wrong!")
def auth(self,username,password):
print("auth----------------->")
if username in _accounts.keys() and password ==_account[username]:
tok = "i_am_right_token"
status = True
msg = tok
error = ""
else:
tok = "i_am_bad_token"
status = False
msg = tok
error ="username is in accounts,or password is not right!"
return St_Query_Rsp(status=status,msg = msg,error =error)
def auth_by_token(self,token): ## tobefixed
print("auth------------------->")
if token !="":
tok = "i_am_right_token"
status = True
msg = tok
error = ""
else:
tok = "i_am_bad_token"
status = False
msg = tok
error ="username is in accounts,or password is not right!"
return St_Query_Rsp(status=status,msg = msg,error =error)
def say_hello(self,id,name):
return f'id : {id}, name : {name} hello! welcome data api! '
# St_Query_Rsp query(1:St_Query_Req req),
# St_Query_Rsp auth(1:string username, 2:string password, 5:bool compress, 8:string mac, 10:string version),
# St_Query_Rsp auth_by_token(1: string token)
print("启动thrift server..........")
port = 9090
host = '0.0.0.0'
server = make_server(data_thrift.DataService, Data_ServiceHandler(), host, port,client_timeout=5000)
server.serve()
4、client端
import sys
#sys.path.append('/home/songroom/thrift_client/gen-py')## 需要更改
#sys.path.append('/home/dataapi/rpc_server/server/gen-py/')
import thriftpy2
from thriftpy2.rpc import make_client
import pickle
import msgpack
import json
from thriftpy2.protocol import binary as proto
from thriftpy2.transport.buffered import TBufferedTransportFactory
from thriftpy2.transport.framed import TFramedTransportFactory
def get_http_uri(host,scheme,port,path):
# host = 'xxx' scheme = 'http' path = '/tf_service' port = ':' + str(10001)
# HTTP_URI = '{scheme}://{host}:{port}{path}'
http_uri = "{}://{}:{}{}".format(scheme, host, port, path) # 'http://xxx:5000/tf_service'
return http_uri
host = "48.123.10.1"
scheme = "http"
port = 9090
path = ""
http_uri = get_http_uri(host,scheme,port,path)
print(f"http_uri : {http_uri}")
client_thrift = thriftpy2.load("data_service.thrift", module_name="client_thrift")
client = make_client(client_thrift.DataService, host, port,timeout=6000)
# def make_client(service, host="localhost", port=9090, unix_socket=None,
# proto_factory=TBinaryProtocolFactory(),
# trans_factory=TBufferedTransportFactory(),
# timeout=3000, cafile=None, ssl_context=None, certfile=None,
# keyfile=None, url="", socket_family=socket.AF_INET):
import client_thrift
print("call rpc: say_hello() ---->")
id = 42
name = 'wowotuo!'
rpc_msg = client.say_hello(id, name)
print(f"say hello: get rpc_msg {rpc_msg} ")
print("call rpc: query() ---->")
params = {
'city': 'shanghai',
'population': 25000000,
'female': 13000000,
'male':12000000
}
_pickle = pickle.dumps(params); #msgpack
_msgpack = msgpack.packb(params)
# 可能是因为thrift 的server端和client端的协议不匹配造成的
print(f"_pickle type : {type(_pickle)}")
print(f"_msgpack type :{type(_msgpack)}")
req = client_thrift.St_Query_Req()
req.method_name = "get_price"
req.params = _pickle #msgpack.packb(params) # 打包成二进制类型
print(f"req : {req}")
rpc_rsp = client.query(req)
print(f"call rpc req ->rsp msg : {rpc_rsp.msg} ")
5、IDL中binary类型的传输,和压缩问题
目前,可以考虑用msgpack或pickle生成bytes类型,这样符合binary类型。总体上看,我认为pickle比msgpack方式略好一点。
在binary后,进行压缩用的是gzip(用6就可以),也比较了zlib,总体上差不多,我倾上于gzip。压缩后,binary的大小会更小一些,更有利于网络传输。
如果要更精细化一点:在客户端的一些类dict和json格式的参数,用msgpack可能会更优一点;大一点的数据在server端可以优先考虑用pickle.
# 可以不用json方式
class PdEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, pd.Timestamp):
return str(obj)
return json.JSONEncoder.default(self, obj)
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
# 具体用法
# msg_binary = pickle.dumps(df) # msgpack.packb(dict_data, default=encode_datetime) # msgpack压缩
# this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)# msgpack解压
# msg_binary = pickle.dumps(df) #我是直接对df->pandas.DataFrame格式进行压缩, df格式比dict,list效果略好.
# 另外,在binary的基础上,还可以进行gzip压缩小,这样传输的大小会小,效果会更好一点。
需要说明的是,目前没有比较pandas与DataTable格式的区别。
五、输出
root@DESKTOP-MEDPUTU:/home/songroom/thrift_client# python client.py
http_uri : http://48.123.10.1:9090
call rpc: say_hello() ---->
say hello: get rpc_msg id : 42, name : wowotuo! hello! welcome data api!
call rpc: query() ---->
_pickle type : <class 'bytes'>
_msgpack type :<class 'bytes'>
req : St_Query_Req(method_name='get_price', params=b'\x80\x04\x95C\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x04city\x94\x8c\x08shanghai\x94\x8c\npopulation\x94J@x}\x01\x8c\x06female\x94J@]\xc6\x00\x8c\x04male\x94J\x00\x1b\xb7\x00u.')
call rpc req ->rsp msg : everything is bad
root@DESKTOP-MEDPUTU:/home/songroom/thrift_client#