如果对某一股票K线图了解比较深刻,可以通过相似度找到与该股票相似的K线图,那么对这些相似的K线图,可以借鉴已经深刻剖析过的股票的方法,已达到事半功倍的效果。
K线图相似度的计算方法使用的是该博文中的方法。
目录

前置说明:
1. 必须以“excute_strategy”为方法名
2. 三个参数,参数1为股票代码股票名键值对;参数2为股票日数据对应目录;参数3为指数(也可以是个股)日数据文件路径
3. 策略代码写完后,保存为.py文件,将该py文件保存在一个目录(自定义)下,运行工具后,选择这个文件夹,程序会加载该目录下的所有py文件
- def excute_strategy(base_data,data_dir,index_data_dir):
- '''
- 股票K线图相似度
- 计算 2021-06-01 到 2022-01-01 之间的日K线
- :param base_data: 股票编码与股票名键值对
- :param data_dir: 股票日数据文件夹目录
- :param index_data_dir: 指数文件路径
- :return:
- '''
- import pandas as pd
- import numpy as np
- import os
-
- start_date_str = '2021-06-01'
- end_date_str = '2022-01-01'
- index_df = pd.read_csv(index_data_dir,encoding='utf-8')
- index_df['o_date'] = index_df['tradeDate']
- index_df['o_date'] = pd.to_datetime(index_df['o_date'])
- index_df = index_df.loc[(index_df['o_date']>=start_date_str) & (index_df['o_date']<=end_date_str)].copy()
- index_df = index_df.loc[:,['o_date','openIndex','closeIndex','highestIndex','lowestIndex']].copy()
-
- dailydata_file_list = os.listdir(data_dir)
-
- res_list = []
- for item in dailydata_file_list:
- item_arr = item.split('.')
- ticker = item_arr[0]
- secName = base_data[ticker]
- file_path = data_dir + item
- df = pd.read_csv(file_path,encoding='utf-8')
- # 删除停牌的数据
- df = df.loc[df['openPrice'] > 0].copy()
- df['o_date'] = df['tradeDate']
- df['o_date'] = pd.to_datetime(df['o_date'])
- df = df.loc[(df['o_date'] >= start_date_str) & (df['o_date']<=end_date_str)].copy()
- # 保存未复权收盘价数据
- df['close'] = df['closePrice']
- # 计算前复权数据
- df['openPrice'] = df['openPrice'] * df['accumAdjFactor']
- df['closePrice'] = df['closePrice'] * df['accumAdjFactor']
- df['highestPrice'] = df['highestPrice'] * df['accumAdjFactor']
- df['lowestPrice'] = df['lowestPrice'] * df['accumAdjFactor']
-
- if len(df)<=0:
- continue
-
- df_merge = pd.merge(index_df,df,on='o_date',how='inner')
- corropen = np.corrcoef(df_merge['openIndex'],df_merge['openPrice'])[0][1]
- corrclose = np.corrcoef(df_merge['closeIndex'],df_merge['closePrice'])[0][1]
- corrhigh = np.corrcoef(df_merge['highestIndex'],df_merge['highestPrice'])[0][1]
- corrlow = np.corrcoef(df_merge['lowestIndex'],df_merge['lowestPrice'])[0][1]
-
- similar_val = (corropen + corrclose + corrhigh + corrlow)/4
- # similar_val = corrlow
-
- res_list.append({
- 'ticker':ticker,
- 'secName':secName,
- 'similar_val':similar_val
- })
- pass
- df00 = pd.DataFrame(res_list)
- df_similar = df00['similar_val'].value_counts(bins=10,sort=False)
- similar_index_list00 = df_similar.index.tolist()
- similar_bar_y = df_similar.values.tolist()
- similar_bar_xTick = []
- similar_bar_x = []
- for i,item in enumerate(similar_index_list00):
- similar_bar_x.append(i)
- similar_bar_xTick.append((i,str(item)))
- pass
-
- similar_bar_data = {
- 'title_str':'相似度分布',
- 'x':similar_bar_x,
- 'y':similar_bar_y,
- 'xTick':similar_bar_xTick
- }
-
- # 指数文件名
- index_file_name = os.path.basename(index_data_dir)
-
- results_data = {
- 'df':df00,
- 'start_date_str':start_date_str,
- 'end_date_str':end_date_str,
- 'index_file_name':index_file_name,
- 'bar_data':similar_bar_data
- }
- return results_data
导入需要的包
- import sys,json,os,math,time,talib
- from threading import Thread
- import numpy as np
- import pandas as pd
- from datetime import datetime
- from dateutil.relativedelta import relativedelta
- from typing import Dict,Any,List
- from PyQt5 import QtCore,QtGui,QtWidgets
- from PyQt5.QtCore import Qt
- import pyqtgraph as pg
- import pyqtgraph.examples
- pg.setConfigOption('background','k')
- pg.setConfigOption('foreground','w')
pyqtgraph横坐标日期倾斜显示控件
- class RotateAxisItem(pg.AxisItem):
- def drawPicture(self, p, axisSpec, tickSpecs, textSpecs):
- p.setRenderHint(p.Antialiasing,False)
- p.setRenderHint(p.TextAntialiasing,True)
-
- ## draw long line along axis
- pen,p1,p2 = axisSpec
- p.setPen(pen)
- p.drawLine(p1,p2)
- p.translate(0.5,0) ## resolves some damn pixel ambiguity
-
- ## draw ticks
- for pen,p1,p2 in tickSpecs:
- p.setPen(pen)
- p.drawLine(p1,p2)
-
- ## draw all text
- # if self.tickFont is not None:
- # p.setFont(self.tickFont)
- p.setPen(self.pen())
- for rect,flags,text in textSpecs:
- # this is the important part
- p.save()
- p.translate(rect.x(),rect.y())
- p.rotate(-30)
- p.drawText(-rect.width(),rect.height(),rect.width(),rect.height(),flags,text)
- # restoring the painter is *required*!!!
- p.restore()
K线图控件
- class CandlestickItem(pg.GraphicsObject):
- def __init__(self, data):
- pg.GraphicsObject.__init__(self)
- self.data = data ## data must have fields: time, open, close, min, max
- self.generatePicture()
-
- def generatePicture(self):
- ## pre-computing a QPicture object allows paint() to run much more quickly,
- ## rather than re-drawing the shapes every time.
- self.picture = QtGui.QPicture()
- p = QtGui.QPainter(self.picture)
- p.setPen(pg.mkPen('d'))
- w = (self.data[1][0] - self.data[0][0]) / 3.
- for (t, open, close, min, max) in self.data:
- p.drawLine(QtCore.QPointF(t, min), QtCore.QPointF(t, max))
- if open < close:
- p.setBrush(pg.mkBrush('r'))
- else:
- p.setBrush(pg.mkBrush('g'))
- p.drawRect(QtCore.QRectF(t-w, open, w * 2, close - open))
- p.end()
-
- def paint(self, p, *args):
- p.drawPicture(0, 0, self.picture)
-
- def boundingRect(self):
- ## boundingRect _must_ indicate the entire area that will be drawn on
- ## or else we will get artifacts and possibly crashing.
- ## (in this case, QPicture does all the work of computing the bouning rect for us)
- return QtCore.QRectF(self.picture.boundingRect())
条形图控件
- class PyQtGraphHistWidget(QtWidgets.QWidget):
- def __init__(self):
- super().__init__()
- self.init_data()
- self.init_ui()
- def init_data(self):
- self.color_bar = ()
- pass
- def init_ui(self):
- self.title_label = QtWidgets.QLabel('均线排列状态')
- self.title_label.setAlignment(Qt.AlignCenter)
- xax = RotateAxisItem(orientation='bottom')
- xax.setHeight(h=50)
- self.pw = pg.PlotWidget(axisItems={'bottom': xax})
- self.pw.setMouseEnabled(x=True, y=False)
- # self.pw.enableAutoRange(x=False,y=True)
- self.pw.setAutoVisible(x=False, y=True)
- layout = QtWidgets.QVBoxLayout()
- layout.addWidget(self.title_label)
- layout.addWidget(self.pw)
- self.setLayout(layout)
- pass
- def set_data(self,data:Dict[str,Any]):
- title_str = data['title_str']
- x = data['x']
- y = data['y']
- xTick = [data['xTick']]
-
- self.y_datas = y
- self.x_data = xTick
- self.x_Tick = data['xTick']
-
- self.title_label.setText(title_str)
- xax = self.pw.getAxis('bottom')
- xax.setTicks(xTick)
-
- barItem = pg.BarGraphItem(x=x,height=y,width=0.8,brush=(107,200,224))
- self.pw.addItem(barItem)
-
- self.vLine = pg.InfiniteLine(angle=90, movable=False)
- self.hLine = pg.InfiniteLine(angle=0, movable=False)
- self.label = pg.TextItem()
-
- self.pw.addItem(self.vLine, ignoreBounds=True)
- self.pw.addItem(self.hLine, ignoreBounds=True)
- self.pw.addItem(self.label, ignoreBounds=True)
- self.vb = self.pw.getViewBox()
- self.proxy = pg.SignalProxy(self.pw.scene().sigMouseMoved, rateLimit=60, slot=self.mouseMoved)
- # 显示整条折线图
- self.pw.enableAutoRange()
- pass
- def mouseMoved(self,evt):
- pos = evt[0]
- if self.pw.sceneBoundingRect().contains(pos):
- mousePoint = self.vb.mapSceneToView(pos)
- index = int(mousePoint.x())
- if index >= 0 and index < len(self.y_datas):
- x_str = str(self.x_Tick[index][1])
-
- y_str_html = ''
- y_str = str(self.y_datas[index])
- y_str_html += ' ' + y_str
- html_str = '
'
+ x_str +'
'+y_str_html+ '' - self.label.setHtml(html_str)
- self.label.setPos(mousePoint.x(), mousePoint.y())
- self.vLine.setPos(mousePoint.x())
- self.hLine.setPos(mousePoint.y())
- pass
- pass
K线图控件
- class PyQtGraphKWidget(QtWidgets.QWidget):
- def __init__(self):
- super().__init__()
- self.init_data()
- self.init_ui()
- def init_data(self):
- # https://www.sioe.cn/yingyong/yanse-rgb-16/
- self.color_line = (30, 144, 255)
- # 0 幽灵的白色; 1 纯黄; 2 紫红色; 3 纯绿; 4 道奇蓝
- self.color_list = [(248, 248, 255), (255, 255, 0), (255, 0, 255), (0, 128, 0), (30, 144, 255)]
- self.main_fixed_target_list = [] # 主体固定曲线,不能被删除
- self.whole_df = None
- self.whole_header = None
- self.whole_pd_header = None
- self.current_whole_data = None
- self.current_whole_df = None
- pass
- def init_ui(self):
- self.whole_duration_label = QtWidgets.QLabel('左边界~右边界')
-
- self.title_label = QtWidgets.QLabel('执行过程查看')
- self.title_label.setAlignment(Qt.AlignCenter)
- self.title_label.setStyleSheet('QLabel{font-size:18px;font-weight:bold}')
-
- xax = RotateAxisItem(orientation='bottom')
- xax.setHeight(h=80)
- self.pw = pg.PlotWidget(axisItems={'bottom': xax})
- self.pw.setMouseEnabled(x=True, y=True)
- # self.pw.enableAutoRange(x=False,y=True)
- self.pw.setAutoVisible(x=False, y=True)
-
- layout_right = QtWidgets.QVBoxLayout()
- layout_right.addWidget(self.title_label)
- layout_right.addWidget(self.whole_duration_label)
- layout_right.addWidget(self.pw)
- self.setLayout(layout_right)
- pass
-
- def set_data(self, data: Dict[str, Any]):
- title_str = data['title_str']
- whole_header = data['whole_header']
- whole_df = data['whole_df']
- whole_pd_header = data['whole_pd_header']
-
- self.whole_header = whole_header
- self.whole_df = whole_df
- self.whole_pd_header = whole_pd_header
-
- self.title_label.setText(title_str)
- self.whole_duration_label.setText(f"{self.whole_df.iloc[0]['tradeDate']}~{self.whole_df.iloc[-1]['tradeDate']}")
-
- self.current_whole_df = self.whole_df.copy()
- self.caculate_and_show_data()
- pass
-
- def caculate_and_show_data(self):
- df = self.current_whole_df.copy()
- df.reset_index(inplace=True)
- df['i_count'] = [i for i in range(len(df))]
- tradeDate_list = df['tradeDate'].values.tolist()
- x = range(len(df))
- xTick_show = []
- x_dur = math.ceil(len(df) / 20)
- for i in range(0, len(df), x_dur):
- xTick_show.append((i, tradeDate_list[i]))
- if len(df) % 20 != 0:
- xTick_show.append((len(df) - 1, tradeDate_list[-1]))
- candle_data = []
- for i, row in df.iterrows():
- candle_data.append(
- (row['i_count'], row['openPrice'], row['closePrice'], row['lowestPrice'], row['highestPrice']))
-
- self.current_whole_data = df.loc[:, self.whole_pd_header].values.tolist()
- # 开始配置显示的内容
- self.pw.clear()
-
- xax = self.pw.getAxis('bottom')
- xax.setTicks([xTick_show])
-
- candle_fixed_target = CandlestickItem(candle_data)
- self.main_fixed_target_list.append(candle_fixed_target)
- self.pw.addItem(candle_fixed_target)
-
- self.vLine = pg.InfiniteLine(angle=90, movable=False)
- self.hLine = pg.InfiniteLine(angle=0, movable=False)
- self.label = pg.TextItem()
-
- self.pw.addItem(self.vLine, ignoreBounds=True)
- self.pw.addItem(self.hLine, ignoreBounds=True)
- self.pw.addItem(self.label, ignoreBounds=True)
-
- self.vb = self.pw.getViewBox()
- self.proxy = pg.SignalProxy(self.pw.scene().sigMouseMoved, rateLimit=60, slot=self.mouseMoved)
- self.pw.enableAutoRange()
- pass
-
- def mouseMoved(self, evt):
- pos = evt[0]
- if self.pw.sceneBoundingRect().contains(pos):
- mousePoint = self.vb.mapSceneToView(pos)
- index = int(mousePoint.x())
- if index >= 0 and index < len(self.current_whole_data):
- target_data = self.current_whole_data[index]
- html_str = ''
- for i, item in enumerate(self.whole_header):
- html_str += f"
{item}:{target_data[i]}" - self.label.setHtml(html_str)
- self.label.setPos(mousePoint.x(), mousePoint.y())
- self.vLine.setPos(mousePoint.x())
- self.hLine.setPos(mousePoint.y())
- pass
-
- def mouseClicked(self, evt):
- pass
-
- def updateViews(self):
- pass
- pass
分页表格控件
- class PageTableWidget(QtWidgets.QWidget):
- output_signal = QtCore.pyqtSignal(object)
- def __init__(self):
- super().__init__()
- self.init_data()
- self.init_ui()
- pass
-
- def init_data(self):
- # 表格全数据 二维数组
- self.table_full_data: List[Any] = []
- # 表格右键菜单 {菜单名:索引指向}
- self.table_right_menus: Dict[str, str] = {}
-
- self.total_page_count: int = 0
- self.total_rows_count: int = 0
- self.current_page: int = 1
- self.single_page_rows: int = 50
- pass
-
- def init_ui(self):
- pre_page_btn = QtWidgets.QPushButton('上一页')
- pre_page_btn.clicked.connect(self.pre_page_btn_clicked)
- next_page_btn = QtWidgets.QPushButton('下一页')
- next_page_btn.clicked.connect(self.next_page_btn_clicked)
-
- tip_label_0 = QtWidgets.QLabel('第')
- self.witch_page_lineedit = QtWidgets.QLineEdit()
- self.int_validator = QtGui.QIntValidator()
- self.witch_page_lineedit.setValidator(self.int_validator)
- self.witch_page_lineedit.setMaximumWidth(20)
- tip_label_1 = QtWidgets.QLabel('页')
- go_page_btn = QtWidgets.QPushButton('前往')
- go_page_btn.clicked.connect(self.go_page_btn_clicked)
- layout_witch_page = QtWidgets.QHBoxLayout()
- layout_witch_page.addWidget(tip_label_0)
- layout_witch_page.addWidget(self.witch_page_lineedit)
- layout_witch_page.addWidget(tip_label_1)
- layout_witch_page.addWidget(go_page_btn)
- layout_witch_page.addStretch(1)
-
- layout_pagechange = QtWidgets.QHBoxLayout()
- layout_pagechange.addWidget(pre_page_btn)
- layout_pagechange.addWidget(next_page_btn)
- layout_pagechange.addLayout(layout_witch_page)
-
- self.total_page_count_label = QtWidgets.QLabel(f"共0页")
- self.total_rows_count_label = QtWidgets.QLabel(f"共0行")
- self.current_page_label = QtWidgets.QLabel(f"当前第0页")
- layout_pagestatus = QtWidgets.QHBoxLayout()
- layout_pagestatus.addWidget(self.total_page_count_label)
- layout_pagestatus.addWidget(self.total_rows_count_label)
- layout_pagestatus.addWidget(self.current_page_label)
- layout_pagestatus.addStretch(1)
-
- layout_top = QtWidgets.QVBoxLayout()
- layout_top.addLayout(layout_pagechange)
- layout_top.addLayout(layout_pagestatus)
-
- self.target_table = QtWidgets.QTableWidget()
- self.target_table.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
- self.target_table.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
- self.target_table.clicked.connect(self.target_table_clicked)
-
- layout = QtWidgets.QVBoxLayout()
- layout.addLayout(layout_top)
- layout.addWidget(self.target_table)
- self.setLayout(layout)
- pass
-
- def set_table_init_data(self, data: Dict[str, Any]):
- '''设置表头、右键菜单等初始化内容'''
- headers: List[str] = data['headers']
-
- self.target_table.setColumnCount(len(headers))
- self.target_table.setHorizontalHeaderLabels(headers)
- pass
-
- def set_table_full_data(self, data: List[Any]):
- '''表格全数据'''
- self.table_full_data = data
- self.total_rows_count = len(data)
- self.total_page_count = math.ceil(self.total_rows_count / self.single_page_rows)
- self.current_page = 1
-
- self.int_validator.setRange(1, self.total_page_count)
-
- self.total_page_count_label.setText(f"共{self.total_page_count}页")
- self.total_rows_count_label.setText(f"共{self.total_rows_count}行")
- self.caculate_current_show_data()
- pass
-
- def setting_current_pagestatus_label(self):
- self.current_page_label.setText(f"当前第{self.current_page}页")
- pass
-
- def caculate_current_show_data(self):
- '''计算当前要显示的数据'''
- start_dot = (self.current_page - 1) * self.single_page_rows
- end_dot = start_dot + self.single_page_rows
- current_data = self.table_full_data[start_dot:end_dot]
- self.fill_table_content(current_data)
- self.setting_current_pagestatus_label()
- pass
-
- def fill_table_content(self, data: List[Any]):
- self.target_table.clearContents()
- self.target_table.setRowCount(len(data))
- for r_i, r_v in enumerate(data):
- for c_i, c_v in enumerate(r_v):
- cell = QtWidgets.QTableWidgetItem(str(c_v))
- self.target_table.setItem(r_i, c_i, cell)
- pass
- pass
- self.target_table.resizeColumnsToContents()
- pass
-
- def go_page_btn_clicked(self):
- '''前往按钮点击'''
- input_page = self.witch_page_lineedit.text()
- input_page = input_page.strip()
- if not input_page:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请输入要跳转的页码',
- QtWidgets.QMessageBox.Yes
- )
- return
- input_page = int(input_page)
- if input_page < 0 or input_page > self.total_page_count:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '输入的页码超出范围',
- QtWidgets.QMessageBox.Yes
- )
- return
- self.current_page = input_page
- self.caculate_current_show_data()
- pass
-
- def pre_page_btn_clicked(self):
- '''上一页按钮点击'''
- if self.current_page <= 1:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '已经是首页',
- QtWidgets.QMessageBox.Yes
- )
- return
- self.current_page -= 1
- self.caculate_current_show_data()
- pass
-
- def next_page_btn_clicked(self):
- '''下一页按钮点击'''
- if self.current_page >= self.total_page_count:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '已经是最后一页',
- QtWidgets.QMessageBox.Yes
- )
- return
- self.current_page += 1
- self.caculate_current_show_data()
- pass
- def target_table_clicked(self):
- selectedItems = self.target_table.selectedItems()
- if len(selectedItems)<=0:
- return
- res_list = []
- for item in selectedItems:
- res_list.append(item.text())
- self.output_signal.emit(res_list)
- pass
- pass
执行结果显示控件
- class PyQtGraphRunningWidget(QtWidgets.QWidget):
- def __init__(self):
- super().__init__()
- self.init_data()
- self.init_ui()
- pass
- def init_data(self):
- self.pre_output_dir = './'
- self.results_output_dir = './results_output/'
- self.json_file_name_list = []
- self.dailydata_path: str = ''
- self.indexdata_path: str = ''
- self.table_header = ['股票编码','股票简称','相似度']
- self.please_select_str = '---请选择---'
- self.num_sort_map = {
- '升序':True,
- '降序':False
- }
- self.table_pd_header: List[str] = ['ticker','secName','similar_val']
-
- self.table_df = None
- self.current_table_df = None
- pass
- def init_ui(self):
- tip_5 = QtWidgets.QLabel('指数日数据文件夹:')
- self.indexdata_dir_lineedit = QtWidgets.QLineEdit()
- self.indexdata_dir_lineedit.setReadOnly(True)
- indexdata_choice_btn = QtWidgets.QPushButton('选择文件夹')
- indexdata_choice_btn.clicked.connect(self.indexdata_choice_btn_clicked)
-
- tip_0 = QtWidgets.QLabel('股票日数据文件夹:')
- self.dailydata_dir_lineedit = QtWidgets.QLineEdit()
- self.dailydata_dir_lineedit.setReadOnly(True)
- dailydata_choice_btn = QtWidgets.QPushButton('选择文件夹')
- dailydata_choice_btn.clicked.connect(self.dailydata_choice_btn_clicked)
-
- tip_1 = QtWidgets.QLabel('Json结果文件选择:')
- self.json_combox = QtWidgets.QComboBox()
- self.json_combox.addItem(self.please_select_str)
- self.json_combox.currentIndexChanged.connect(self.json_combox_currentIndexChanged)
- refresh_json_btn = QtWidgets.QPushButton('刷新结果下拉表')
- refresh_json_btn.clicked.connect(self.refresh_json_btn_clicked)
-
- layout_top = QtWidgets.QGridLayout()
- layout_top.addWidget(tip_5, 0, 0, 1, 1)
- layout_top.addWidget(self.indexdata_dir_lineedit, 0, 1, 1, 3)
- layout_top.addWidget(indexdata_choice_btn, 0, 4, 1, 1)
- layout_top.addWidget(tip_0,1,0,1,1)
- layout_top.addWidget(self.dailydata_dir_lineedit,1,1,1,3)
- layout_top.addWidget(dailydata_choice_btn,1,4,1,1)
- layout_top.addWidget(tip_1,2,0,1,1)
- layout_top.addWidget(self.json_combox,2,1,1,3)
- layout_top.addWidget(refresh_json_btn,2,4,1,1)
-
- tip_3 = QtWidgets.QLabel('股票名模糊查询')
- self.query_lineedit = QtWidgets.QLineEdit()
- query_btn = QtWidgets.QPushButton('查询')
- query_btn.clicked.connect(self.query_btn_clicked)
- reset_btn = QtWidgets.QPushButton('重置')
- reset_btn.clicked.connect(self.reset_btn_clicked)
-
- tip_4 = QtWidgets.QLabel('持续时间排序:')
- self.num_combox = QtWidgets.QComboBox()
- self.num_combox.addItem(self.please_select_str)
- self.num_combox.addItems(list(self.num_sort_map.keys()))
- self.num_combox.currentIndexChanged.connect(self.num_combox_currentIndexChanged)
-
- layout_query = QtWidgets.QGridLayout()
- layout_query.addWidget(tip_3,0,0,1,1)
- layout_query.addWidget(self.query_lineedit,0,1,1,3)
- layout_query.addWidget(query_btn,0,4,1,1)
- layout_query.addWidget(reset_btn,0,5,1,1)
- layout_query.addWidget(tip_4,1,0,1,1)
- layout_query.addWidget(self.num_combox,1,4,1,2)
-
- self.table = PageTableWidget()
- self.table.set_table_init_data({'headers': self.table_header})
- self.table.output_signal.connect(self.table_output_signal_emit)
-
- self.bar_widget = PyQtGraphHistWidget()
-
- layout_left_up = QtWidgets.QVBoxLayout()
- layout_left_up.addLayout(layout_query)
- layout_left_up.addWidget(self.table)
-
- layout_left = QtWidgets.QVBoxLayout()
- layout_left.addLayout(layout_left_up,3)
- layout_left.addWidget(self.bar_widget,1)
-
- self.index_k_widget = PyQtGraphKWidget()
- self.other_k_widget = PyQtGraphKWidget()
-
- layout_right = QtWidgets.QVBoxLayout()
- layout_right.addWidget(self.other_k_widget)
- layout_right.addWidget(self.index_k_widget)
-
- layout_down = QtWidgets.QHBoxLayout()
- layout_down.addLayout(layout_left,1)
- layout_down.addSpacing(30)
- layout_down.addLayout(layout_right,2)
-
- layout = QtWidgets.QVBoxLayout()
- layout.addLayout(layout_top)
- layout.addLayout(layout_down)
- self.setLayout(layout)
- pass
- def set_json_data(self,data:Dict[str,Any]):
- self.table_df = data['df']
- self.start_date_str = data['start_date_str']
- self.end_date_str = data['end_date_str']
-
- self.current_table_df = self.table_df.copy()
- self.fill_table_data()
-
- bar_data = data['bar_data']
- self.bar_widget.set_data(bar_data)
- pass
- def fill_table_data(self):
- table_data = self.current_table_df.loc[:, self.table_pd_header].values.tolist()
- self.table.set_table_full_data(table_data)
- pass
-
- def table_output_signal_emit(self,data:List):
- # ticker secName status_name status_num
- dailydata_dir = self.dailydata_dir_lineedit.text()
- dailydata_dir = dailydata_dir.strip()
- if len(dailydata_dir)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请选择股票日数据文件件',
- QtWidgets.QMessageBox.Yes
- )
- return
- daily_file_path = dailydata_dir + '/' + data[0] + '.csv'
- df = pd.read_csv(daily_file_path,encoding='utf-8')
- # 删除停牌的数据
- df = df.loc[df['openPrice'] > 0].copy()
- df['o_date'] = df['tradeDate']
- df['o_date'] = pd.to_datetime(df['o_date'])
- df = df.loc[(df['o_date'] >= self.start_date_str) & (df['o_date']<=self.end_date_str)].copy()
- # 保存未复权收盘价数据
- df['close'] = df['closePrice']
- # 计算前复权数据
- df['openPrice'] = df['openPrice'] * df['accumAdjFactor']
- df['closePrice'] = df['closePrice'] * df['accumAdjFactor']
- df['highestPrice'] = df['highestPrice'] * df['accumAdjFactor']
- df['lowestPrice'] = df['lowestPrice'] * df['accumAdjFactor']
-
- columns_list = ['日期','收盘价','开盘价','最高价','最低价']
- columns_pd_list = ['tradeDate','closePrice','openPrice','highestPrice','lowestPrice']
-
- line_data = {
- 'title_str':data[1],
- 'whole_header':columns_list,
- 'whole_df':df,
- 'whole_pd_header':columns_pd_list
- }
- self.other_k_widget.set_data(line_data)
- pass
- def indexdata_choice_btn_clicked(self):
- path = QtWidgets.QFileDialog.getExistingDirectory(
- self,
- '打开指数日数据所在文件夹',
- self.pre_output_dir
- )
- if not path:
- return
- self.indexdata_path = path
- self.indexdata_dir_lineedit.setText(path)
- pass
- def dailydata_choice_btn_clicked(self):
- path = QtWidgets.QFileDialog.getExistingDirectory(
- self,
- '打开股票日数据所在文件夹',
- self.pre_output_dir
- )
- if not path:
- return
- self.dailydata_path = path
- self.dailydata_dir_lineedit.setText(path)
- pass
- def json_combox_currentIndexChanged(self,cur_i:int):
- cur_txt = self.json_combox.currentText()
- if not cur_txt or cur_txt == self.please_select_str:
- return
- current_json_file_path = self.results_output_dir + cur_txt
- with open(current_json_file_path,'r',encoding='utf-8') as fr:
- obj_json = json.load(fr)
- index_file_name = obj_json['index_file_name']
- start_date_str = obj_json['start_date_str']
- end_date_str = obj_json['end_date_str']
- indexdata_dir = self.indexdata_dir_lineedit.text()
- indexdata_dir = indexdata_dir.strip()
- if len(indexdata_dir) <= 0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请先选择指数日数据所在文件夹',
- QtWidgets.QMessageBox.Yes
- )
- return
- indexfile_path = indexdata_dir +'/'+ index_file_name
- if not os.path.exists(indexfile_path):
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '不存在,'+index_file_name,
- QtWidgets.QMessageBox.Yes
- )
- return
-
- # 显示指数K线图
- df_index = pd.read_csv(indexfile_path,encoding='utf-8')
- df_index['o_date'] = df_index['tradeDate']
- df_index['o_date'] = pd.to_datetime(df_index['o_date'])
- df_index = df_index.loc[(df_index['o_date']>=start_date_str) & (df_index['o_date']<=end_date_str)].copy()
- df_index['openPrice'] = df_index['openIndex']
- df_index['closePrice'] = df_index['closeIndex']
- df_index['highestPrice'] = df_index['highestIndex']
- df_index['lowestPrice'] = df_index['lowestIndex']
- title_str = index_file_name.split('.')[0]
- whole_header = ['日期','收','开','高','低']
- whole_df = df_index
- whole_pd_header = ['tradeDate','closePrice','openPrice','highestPrice','lowestPrice']
-
- index_line_data = {
- 'title_str':title_str,
- 'whole_header':whole_header,
- 'whole_df':whole_df,
- 'whole_pd_header':whole_pd_header
- }
- self.index_k_widget.set_data(index_line_data)
-
- df = pd.DataFrame(obj_json['df_json'])
- obj_json['df'] = df
-
- self.set_json_data(obj_json)
- pass
- def query_btn_clicked(self):
- query_str = self.query_lineedit.text()
- query_str = query_str.strip()
- if len(query_str)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请输入要查询的内容',
- QtWidgets.QMessageBox.Yes
- )
- return
- self.status_combox.setCurrentText(self.please_select_str)
- df = self.table_df.copy()
- self.current_table_df = df.loc[df['secName'].str.contains(query_str)].copy()
- self.fill_table_data()
- pass
- def reset_btn_clicked(self):
- self.query_lineedit.setText('')
- self.status_combox.setCurrentText(self.please_select_str)
- self.current_table_df = self.table_df.copy()
- self.fill_table_data()
- pass
- def num_combox_currentIndexChanged(self,cur_i:int):
- cur_txt = self.num_combox.currentText()
- if cur_txt == self.please_select_str:
- return
- self.current_table_df.sort_values(by='similar_val',ascending=self.num_sort_map[cur_txt],inplace=True)
- self.fill_table_data()
- pass
- def refresh_json_btn_clicked(self):
- indexdata_dir = self.indexdata_dir_lineedit.text()
- indexdata_dir = indexdata_dir.strip()
- if len(indexdata_dir)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请先选择指数日数据所在文件夹',
- QtWidgets.QMessageBox.Yes
- )
- return
- file_list = os.listdir(self.results_output_dir)
- json_file_list = []
- for item in file_list:
- if item.endswith('.json'):
- json_file_list.append(item)
- self.json_file_name_list.extend(json_file_list)
- json_file_set = set(self.json_file_name_list)
- self.json_file_name_list = list(json_file_set)
- self.json_combox.clear()
- self.json_combox.addItem(self.please_select_str)
- self.json_combox.addItems(self.json_file_name_list)
- pass
- pass
策略操作运行控件
- class StrategeMainWidget(QtWidgets.QWidget):
- signal_runcode = QtCore.pyqtSignal(object)
- signal_time = QtCore.pyqtSignal(object)
- def __init__(self):
- super().__init__()
- self.thread_run: Thread = None
- self.thread_time: Thread = None
-
- self.running_graph_widget: QtWidgets.QWidget = None
-
- self.init_data()
- self.init_ui()
- self.register_event()
- pass
- def init_data(self):
- self.pre_output_dir = './'
- self.results_output_dir = './results_output/'
- self.secID_name_file_name = 'secID_name.csv'
- self.please_select_str: str = '--请选择--'
- self.stratege_name_list: List = []
- self.tip_msg_0: str = '1.选择策略所在文件夹;2.选择策略;3.点击运行。'
- self.stratege_path: str = ''
- self.stratege_run_start_time = None
- self.stratege_start = False
- self.current_stratege_py_str: str = ''
- self.dailydata_path:str = ''
- pass
- def init_ui(self):
- self.setWindowTitle('股票K线图相似度工具')
- tip_3 = QtWidgets.QLabel('指数日数据文件:')
- self.indexfilepath_lineedit = QtWidgets.QLineEdit()
- self.indexfilepath_lineedit.setReadOnly(True)
- indexfilepath_choice_btn = QtWidgets.QPushButton('选择文件')
- indexfilepath_choice_btn.clicked.connect(self.indexfilepath_choice_btn_clicked)
-
- tip_2 = QtWidgets.QLabel('股票日数据文件夹:')
- self.dailydata_dir_lineedit = QtWidgets.QLineEdit()
- self.dailydata_dir_lineedit.setReadOnly(True)
- dailydata_choice_btn = QtWidgets.QPushButton('选择文件夹')
- dailydata_choice_btn.clicked.connect(self.dailydata_choice_btn_clicked)
-
- tip_0 = QtWidgets.QLabel('选择策略所在文件夹:')
- self.stratege_dir_lineedit = QtWidgets.QLineEdit()
- self.stratege_dir_lineedit.setReadOnly(True)
- stratege_choice_btn = QtWidgets.QPushButton('选择文件夹')
- stratege_choice_btn.clicked.connect(self.stratege_choice_btn_clicked)
-
- tip_1 = QtWidgets.QLabel('策略:')
- self.stratege_combox = QtWidgets.QComboBox()
- self.stratege_combox.addItem(self.please_select_str)
- self.stratege_combox.currentIndexChanged.connect(self.stratege_combox_currentIndexChanged)
-
- self.run_btn = QtWidgets.QPushButton('运行')
- self.run_btn.clicked.connect(self.run_btn_clicked)
- self.force_stop_btn = QtWidgets.QPushButton('强制停止')
- self.force_stop_btn.clicked.connect(self.force_stop_btn_clicked)
-
- layout_top_left = QtWidgets.QGridLayout()
- layout_top_left.addWidget(tip_3, 0, 0, 1, 1)
- layout_top_left.addWidget(self.indexfilepath_lineedit, 0, 1, 1, 3)
- layout_top_left.addWidget(indexfilepath_choice_btn, 0, 4, 1, 1)
- layout_top_left.addWidget(tip_2,1,0,1,1)
- layout_top_left.addWidget(self.dailydata_dir_lineedit,1,1,1,3)
- layout_top_left.addWidget(dailydata_choice_btn,1,4,1,1)
- layout_top_left.addWidget(tip_0,2,0,1,1)
- layout_top_left.addWidget(self.stratege_dir_lineedit,2,1,1,3)
- layout_top_left.addWidget(stratege_choice_btn,2,4,1,1)
- layout_top_left.addWidget(tip_1,3,0,1,1)
- layout_top_left.addWidget(self.stratege_combox,3,1,1,2)
- layout_top_left.addWidget(self.run_btn,3,3,1,1)
- layout_top_left.addWidget(self.force_stop_btn,3,4,1,1)
-
- self.tip_msg_label = QtWidgets.QLabel()
- self.tip_msg_label.setWordWrap(True)
- self.tip_msg_label.setText(self.tip_msg_0)
- results_output_look_btn = QtWidgets.QPushButton('结果查看')
- results_output_look_btn.clicked.connect(self.results_output_look_btn_clicked)
-
- layout_top_right = QtWidgets.QHBoxLayout()
- layout_top_right.addWidget(self.tip_msg_label)
- layout_top_right.addWidget(results_output_look_btn)
-
- layout_top = QtWidgets.QHBoxLayout()
- layout_top.addLayout(layout_top_left,3)
- layout_top.addSpacing(30)
- layout_top.addLayout(layout_top_right,1)
-
- self.code_textedit = QtWidgets.QTextEdit()
- self.code_textedit.setReadOnly(True)
-
- layout = QtWidgets.QVBoxLayout()
- layout.addLayout(layout_top)
- layout.addWidget(self.code_textedit)
- self.setLayout(layout)
- pass
- def register_event(self):
- self.signal_runcode.connect(self.thread_run_excuted)
- self.signal_time.connect(self.thread_time_excuted)
- pass
- def results_output_look_btn_clicked(self):
- '''策略运行结果查看'''
- if not self.running_graph_widget:
- self.running_graph_widget = PyQtGraphRunningWidget()
- self.running_graph_widget.showMaximized()
- pass
- def indexfilepath_choice_btn_clicked(self):
- path, _ = QtWidgets.QFileDialog.getOpenFileName(
- self,
- '打开指数日数据文件',
- self.pre_output_dir
- )
- if not path:
- return
- self.indexfilepath_lineedit.setText(path)
- pass
- def dailydata_choice_btn_clicked(self):
- path = QtWidgets.QFileDialog.getExistingDirectory(
- self,
- '打开股票日数据所在文件夹',
- self.pre_output_dir
- )
- if not path:
- return
- self.dailydata_path = path+'/'
- self.dailydata_dir_lineedit.setText(path)
- pass
- def stratege_choice_btn_clicked(self):
- '''选择策略所在文件夹'''
- path = QtWidgets.QFileDialog.getExistingDirectory(
- self,
- '打开策略所在文件夹',
- self.pre_output_dir
- )
- if not path:
- return
- self.stratege_path = path
- self.stratege_dir_lineedit.setText(path)
- file_list = os.listdir(path)
- temp_file_list = set(self.stratege_name_list)
- for item in file_list:
- if item.endswith('.py'):
- temp_file_list.add(item)
- self.stratege_name_list = list(temp_file_list)
-
- self.stratege_combox.clear()
- self.stratege_combox.addItem(self.please_select_str)
- self.stratege_combox.addItems(self.stratege_name_list)
- pass
- def stratege_combox_currentIndexChanged(self,cur_i:int):
- cur_txt = self.stratege_combox.currentText()
- if not cur_txt or cur_txt == self.please_select_str:
- self.code_textedit.clear()
- return
- file_path = self.stratege_path + os.path.sep + cur_txt
- with open(file_path,'r',encoding='utf-8') as fr:
- code_txt = fr.read()
- self.code_textedit.setPlainText(code_txt)
- pass
- def run_btn_clicked(self):
- '''运行按钮'''
- # 检查指数日数据文件
- indexfilepath = self.indexfilepath_lineedit.text()
- indexfilepath = indexfilepath.strip()
- if len(indexfilepath)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请选择指数日数据文件',
- QtWidgets.QMessageBox.Yes
- )
- return
- # 检查股票日数据文件夹
- dailydata_dir = self.dailydata_dir_lineedit.text()
- dailydata_dir = dailydata_dir.strip()
- if len(dailydata_dir)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请选择股票日数据文件夹',
- QtWidgets.QMessageBox.Yes
- )
- return
- dailydata_file_list = os.listdir(dailydata_dir)
- if len(dailydata_file_list)<=0:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '股票日数据文件夹中没有文件',
- QtWidgets.QMessageBox.Yes
- )
- return
- secID_name_file = self.results_output_dir + self.secID_name_file_name
- if not os.path.exists(secID_name_file):
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '股票码与股票名基础文件不存在',
- QtWidgets.QMessageBox.Yes
- )
- return
-
- py_str = self.code_textedit.toPlainText()
- if len(py_str)<10:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '请选择要执行的策略',
- QtWidgets.QMessageBox.Yes
- )
- return
- self.current_stratege_py_str = py_str
- base_data = {}
- base_df = pd.read_csv(secID_name_file,encoding='utf-8')
- for i,row in base_df.iterrows():
- secID = row['secID']
- secID_arr = secID.split('.')
- base_data[secID_arr[0]] = row['secShortName']
- pass
-
- self.run_btn.setDisabled(True)
- self.stratege_combox.setDisabled(True)
- self.stratege_run_start_time = datetime.now()
- self.stratege_start = True
-
- if self.thread_run:
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '有策略正在运行',
- QtWidgets.QMessageBox.Yes
- )
- return
- pre_data = {
- 'py_str':py_str,
- 'base_data':base_data,
- 'indexfilepath':indexfilepath
- }
- self.thread_run = Thread(
- target=self.running_run_thread,
- args=(pre_data,)
- )
- self.thread_run.start()
- self.thread_time = Thread(
- target=self.running_time_thread
- )
- self.thread_time.start()
- pass
- def force_stop_btn_clicked(self):
- '''强制停止按钮'''
- self.thread_run = None
- self.thread_time = None
-
- self.run_btn.setDisabled(False)
- self.stratege_combox.setDisabled(False)
- self.stratege_start = False
- pass
- def running_run_thread(self,data:Dict[str,Any]):
- '''执行代码线程'''
- py_str = data['py_str']
- base_data = data['base_data']
- indexfilepath = data['indexfilepath']
-
- namespace = {}
- fun_stragegy = compile(py_str,'
' ,'exec') - exec(fun_stragegy,namespace)
- ret = namespace['excute_strategy'](base_data,self.dailydata_path,indexfilepath)
- self.signal_runcode.emit(ret)
- pass
- def running_time_thread(self):
- '''计时线程'''
- while self.stratege_start:
- now = datetime.now()
- interval_time = (now-self.stratege_run_start_time).seconds
- res_map = {'res':interval_time}
- self.signal_time.emit(res_map)
- time.sleep(1)
- pass
- def thread_run_excuted(self,data:Dict):
- '''策略代码执行返回结果'''
- self.run_btn.setDisabled(False)
- self.stratege_combox.setDisabled(False)
-
- # 保存结果文件
- now_datetime_str = datetime.now().strftime('%Y%m%d%H%M%S')
- df = data['df']
- df_json = df.to_dict(orient='records')
- pre_save_data = {
- 'df_json':df_json,
- 'end_date_str':data['end_date_str'],
- 'start_date_str':data['start_date_str'],
- 'index_file_name':data['index_file_name'],
- 'bar_data':data['bar_data']
- }
- with open(self.results_output_dir + now_datetime_str + '.json','w',encoding='utf-8') as fw:
- json.dump(pre_save_data,fw)
- if not self.running_graph_widget:
- self.running_graph_widget = PyQtGraphRunningWidget()
- self.running_graph_widget.showMaximized()
- self.thread_run = None
- self.thread_time = None
- self.stratege_start = False
-
- QtWidgets.QMessageBox.information(
- self,
- '提示',
- '当前策略运行完毕',
- QtWidgets.QMessageBox.Yes
- )
- pass
- def thread_time_excuted(self,data:Dict):
- '''计时返回结果'''
- res = data['res']
- self.tip_msg_label.setText(f"{res}s")
- pass
- def closeEvent(self, a0: QtGui.QCloseEvent) -> None:
- if self.thread_time:
- self.thread_time.join()
- if self.thread_run:
- self.thread_run.join()
- if self.running_graph_widget:
- self.running_graph_widget.close()
- self.close()
- if __name__ == '__main__':
- QtCore.QCoreApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
- app = QtWidgets.QApplication(sys.argv)
- t_win = StrategeMainWidget()
- t_win.showMaximized()
- app.exec()
- pass
1. 在入口代码对应的py文件的同一目录下创建 results_output 目录
2. 下载数据,将 secID_name.csv 文件放入 results_output 目录里
3. 程序运行策略后,会把执行结果放入 results_output 目录中,以json文件存储

链接:https://pan.baidu.com/s/1acghH4GQyu_OaOUmEjPdHw
提取码:6toe