目录
window系统中必须安装有Visual Studio ,后面源码安装时需要进行C++编译
GitHub - vnpy/vnpy_ctp: VeighNa框架的CTP交易接口
下载zip压缩包
![]()
解压



要在python中能执行,要有.pyd文件,解压后的文件夹内没有.pyd文件
新建一个python项目,项目在一个新的虚拟环境中执行。
操作系统:window10 64位
开发工具:pycharm
python版本:python3.8.10


打开项目下方的terminal面板

cd 到解压后setup.py 所在文件夹

执行 python setup.py build

大概2到3分钟,编译完毕,在setup.py所在文件夹下多出一个build文件夹

在build下找到与操作系统和python版本对应的文件夹,以本文为例,操作系统是64位,那文件夹名称中就会有amd64,python版本3.8,文件夹名称中就会包含3.8,所以本文的文件夹名为lib.win-amd64-3.8
我们需要的是 build>lib.win-amd64-3.8>vnpy_ctp文件夹下的api文件夹,我们把api文件夹复制到项目目录下

创建Md调用的类和Td调用的类
- import datetime,sys,os,time,pytz
- from api import (
- TdApi,
- MdApi
- )
- class CtpMdApi(MdApi):
- def __init__(self)->None:
- super().__init__()
-
- self.reqid:int = 0
- self.connect_status: bool = False
- self.login_status: bool = False
- self.subscribed: set = set()
-
- self.userid: str = ""
- self.password: str = ""
- self.brokerid: str = ""
-
- self.current_date: str = datetime.date.today().strftime('%Y%m%d')
- pass
- def connect(self, address: str, userid: str, password: str, brokerid: str)->None:
- self.userid = userid
- self.password = password
- self.brokerid = brokerid
-
- if not self.connect_status:
- con_body_path = CTP_MD_CON_DIR
- con_body_path = con_body_path.replace('/','\\')
- self.createFtdcMdApi(con_body_path)
- self.registerFront(address)
- self.init()
-
- self.connect_status = True
- pass
- pass
-
- def login(self)->None:
- ctp_req: dict = {
- 'UserID':self.userid,
- 'Password':self.password,
- 'BrokerID':self.brokerid
- }
- self.reqid += 1
- self.reqUserLogin(ctp_req,self.reqid)
- pass
- def subscribe(self,req:dict):
- if self.login_status:
- self.subscribeMarketData(req['symbol'])
- self.subscribed.add(req['symbol'])
- pass
- def close(self)->None:
- if self.connect_status:
- self.exit()
- pass
- def update_date(self)->None:
- self.current_date = datetime.date.today().strftime('%Y%m%d')
- pass
- def onFrontConnected(self)->None:
- self.login()
- pass
- def onFrontDisconnected(self,reason:int)->None:
- self.login_status = False
- pass
- def onRspUserLogin(self,data:dict,error:dict,reqid:int,last:bool)->None:
- if not error['ErrorID']:
- self.login_status = True
- for symbol in self.subscribed:
- self.subscribeMarketData(symbol)
- pass
- else:
- print(f"行情服务器登录失败。{error['ErrorID']}.{error['ErrorMsg']}")
- pass
- def onRspError(self, error: dict, reqid: int, last: bool)->None:
- print('行情接口报错。', error['ErrorID'], error['ErrorMsg'])
- pass
- def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
- if not error or not error['ErrorID']:
- return
- print('行情订阅失败。',error['ErrorID'],error['ErrorMsg'])
- pass
- def onRtnDepthMarketData(self,data:dict)->None:
- if not data['UpdateTime']:
- return
- print('tick返回',data['InstrumentID'],data['LastPrice'])
- pass
- pass
-
- class CtpTdApi(TdApi):
- def __init__(self)->None:
- super().__init__()
- self.reqid: int = 0
- self.order_ref: int = 0
-
- self.connect_status: bool = False
- self.login_status: bool = False
- self.auth_status: bool = False
- self.login_failed: bool = False
- self.auth_failed: bool = False
- self.contract_inited: bool = False
-
- self.userid: str = ""
- self.password: str = ""
- self.brokerid: str = ""
- self.auth_code: str = ""
- self.appid: str = ""
-
- self.frontid: int = 0
- self.sessionid: int = 0
- pass
- def connect(self,address:str,userid:str,password:str,brokerid:str,auth_code:str,appid:str)->None:
- self.userid = userid
- self.password = password
- self.brokerid = brokerid
- self.auth_code = auth_code
- self.appid = appid
-
- if not self.connect_status:
- con_body_path = CTP_TD_CON_DIR + self.userid + os.path.sep
- if not os.path.exists(con_body_path):
- os.mkdir(con_body_path)
- con_body_path = con_body_path.replace('/','\\')
- self.createFtdcTraderApi(con_body_path)
-
- self.subscribePrivateTopic(0)
- self.subscribePublicTopic(0)
- self.registerFront(address)
- self.init()
-
- self.connect_status = True
- pass
- else:
- self.authenticate()
- pass
-
- def authenticate(self)->None:
- if self.auth_failed:
- return
- ctp_req: dict = {
- "UserID": self.userid,
- "BrokerID": self.brokerid,
- "AuthCode": self.auth_code,
- "AppID": self.appid
- }
-
- self.reqid += 1
- self.reqAuthenticate(ctp_req, self.reqid)
- pass
- def login(self)->None:
- if self.login_failed:
- return
- ctp_req: dict = {
- "UserID": self.userid,
- "Password": self.password,
- "BrokerID": self.brokerid,
- "AppID": self.appid
- }
-
- self.reqid += 1
- self.reqUserLogin(ctp_req, self.reqid)
- pass
- def close(self)->None:
- if self.connect_status:
- self.exit()
- pass
- def onFrontConnected(self)->None:
- if self.auth_code:
- self.authenticate()
- else:
- self.login()
- pass
- def onFrontDisconnected(self,reason:int)->None:
- self.login_status = False
- pass
- def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool)->None:
- if not error['ErrorID']:
- self.auth_status = True
- self.login()
- else:
- self.auth_failed = True
- print('交易服务器验证失败。',error['ErrorID'],error['ErrorMsg'])
- pass
- def onRspUserLogin(self,data: dict, error: dict, reqid: int, last: bool)->None:
- if not error["ErrorID"]:
- self.frontid = data["FrontID"]
- self.sessionid = data["SessionID"]
- self.login_status = True
-
- # 自动确认结算单
- ctp_req: dict = {
- "BrokerID": self.brokerid,
- "InvestorID": self.userid
- }
- self.reqid += 1
- self.reqSettlementInfoConfirm(ctp_req, self.reqid)
- else:
- self.login_failed = True
- print("交易服务器登录失败", error['ErrorID'], error['ErrorMsg'])
- pass
- def onRspSettlementInfoConfirm(self,data: dict, error: dict, reqid: int, last: bool)->None:
- while True:
- self.reqid += 1
- n: int = self.reqQryInstrument({}, self.reqid)
- if not n:
- break
- else:
- time.sleep(1)
- pass
- def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool) -> None:
- print(data['ProductClass'],data['InstrumentID'],data['ProductID'],reqid,last)
- if last:
- self.contract_inited = True
- print('合约信息查询完毕')
- pass
- pass
执行代码
- if __name__ == '__main__':
- investorid = ""
- brokerid=""
- password= ""
- appid= ""
- auth_code= "0000000000000000"
- md_ip= "180.168.146.187:10211"
- trader_ip= "180.168.146.187:10201"
-
- temp_api = CtpTdApi()
- address = f"tcp://{trader_ip}"
- temp_api.connect(address,investorid,password,brokerid,auth_code,appid)
-
- import keyboard
- keyboard.wait('esc')
- sys.exit()
- pass

能登录td服务器,并正常查询合约,可用