技术指标的背离是指技术指标曲线的波动方向与价格曲线的趋势方向不一致,是使用技术指标最为重要的一点。在股市中,常见的技术指标的背离分为两种常见的形式,即顶背离和底背离。背离是预示市场走势即将见顶或者见底的依据,在价格还没有发生变化之前,技术指标会提前显示未来股价发展的趋势。
顶部背离,是指在股市的上涨其中,股价的高点比前一点要高,而指标的高点却在下降,此时股票上涨的行情不会持久,投资者需谨慎,应考虑做空操作,以免被套牢。
底部背离,是指股价下跌的过程中,技术指标虽然显示出比上一次的有所提高,但是股价仍然在下跌不止,此时股价的下跌行情快要见底。这是典型的买进信号,投资者可以做好准备,在适当的时间买入股票。
在股市中,几乎所有的技术指标都有背离提示作用的功能,例如MACD、RSI和BOLL等。投资者可以用这些指标的背离功能来预测头部的风险和底部的买入机会,但在选择的时间参数上应当适当延长。由于日线存在着较多的骗线,一些技术指标会反复发出背离信号,使得其实用性不强,建议重点关注周线上的技术指标背离现象。
PS:以上知识来自书籍《同花顺炒股从入门到精通》
目录

思路:
1. 找出K线的波峰波谷
2. 找出指标的波峰波谷
3. 将K线的波峰波谷和指标的波峰波谷对齐(就是将指标的高点和K线的高点对齐,指标的低点和K线的低点对齐,无法对齐的舍弃),本策略是K线高点和低点位置前后3个位置内指标出现了对应的高点和低点,就算是对齐,取最相近的那个点
4. K线和指标的 高低点对齐后,每两个高点对比,如果K线升高,指标降低,或K线降低,指标升高,说明这对高点是顶背离;每两个低点对比,如果K线升高,指标降低,或K线降低,指标升高,说明是底背离。
- def res_peak_trough_list(df,peak_col,trough_col):
- '''
- 寻找波峰波谷
- :param df: 必须要有i_row
- :param peak_col: 波峰字段
- :param trough_col: 波谷字段
- :return:
- '''
- # 循环寻找波峰和波谷,交替寻找
- peak_list = []
- trough_list = []
- start_up_i_row = 0
- peak_yeah = False
- trough_yeah = False
- while True:
- if start_up_i_row >= len(df):
- break
- df00 = df.loc[df['i_row'] >= start_up_i_row].copy()
- df00['peak'] = df00[peak_col].cummax()
- df00['ext_00'] = df00['peak'] - df00['peak'].shift(-1)
- df00_p = df00.loc[(df00['ext_00'] == 0) & (df00['ext_00'].shift(-1) == 0) & (df00['ext_00'].shift(-2) == 0) & (
- df00['ext_00'].shift(-3) == 0) & (df00['ext_00'].shift(-4) == 0)].copy()
-
- df00['trough'] = df00[trough_col].cummin()
- df00['ext_10'] = df00['trough'] - df00['trough'].shift(-1)
- df00_t = df00.loc[(df00['ext_10'] == 0) & (df00['ext_10'].shift(-1) == 0) & (df00['ext_10'].shift(-2) == 0) & (
- df00['ext_10'].shift(-3) == 0) & (df00['ext_10'].shift(-4) == 0)].copy()
-
- if (df00_p is None or len(df00_p) <= 0) and (df00_t is not None and len(df00_t) > 0):
- trough_0 = df00_t.iloc[0]['i_row']
- if trough_yeah:
- trough_list.append(trough_0)
- break
- elif (df00_t is None or len(df00_t) <= 0) and (df00_p is not None and len(df00_p) > 0):
- peak_0 = df00_p.iloc[0]['i_row']
- if peak_yeah:
- peak_list.append(peak_0)
- break
- elif (df00_p is None or len(df00_p) <= 0) or (df00_t is None or len(df00_t) <= 0):
- break
- else:
- trough_0 = df00_t.iloc[0]['i_row']
- peak_0 = df00_p.iloc[0]['i_row']
-
- if peak_0 == trough_0:
- start_up_i_row += 1
- continue
- if peak_0 == 0:
- peak_yeah = False
- trough_yeah = True
- peak_list.append(peak_0)
- start_up_i_row = trough_0
- elif trough_0 == 0:
- trough_yeah = False
- peak_yeah = True
- trough_list.append(trough_0)
- start_up_i_row = peak_0
- else:
- if not trough_yeah and not peak_yeah:
- # 首次进入
- if peak_0 < trough_0:
- trough_yeah = True
- peak_list.append(peak_0)
- start_up_i_row = trough_0
- else:
- peak_yeah = True
- trough_list.append(trough_0)
- start_up_i_row = peak_0
- else:
- if peak_0 < trough_0:
- if peak_yeah:
- peak_list.append(peak_0)
- peak_yeah = False
- trough_yeah = True
- start_up_i_row = trough_0
- else:
- if trough_yeah:
- trough_list.append(trough_0)
- trough_yeah = False
- peak_yeah = True
- start_up_i_row = peak_0
- pass
-
- two_list = peak_list + trough_list
- two_list.sort()
- i = 0
- remove_list = []
- while i < len(two_list) - 2:
- one_i = two_list[i]
- two_i = two_list[i + 1]
- if one_i in peak_list and two_i in trough_list:
- if abs(one_i - two_i) <= 2:
- i += 2
- remove_list.append(one_i)
- remove_list.append(two_i)
- continue
- pass
- elif one_i in trough_list and two_i in peak_list:
- if abs(one_i - two_i) <= 2:
- i += 2
- remove_list.append(one_i)
- remove_list.append(two_i)
- continue
- pass
- else:
- pass
- i += 1
- pass
-
- two_list00 = []
- for item in two_list:
- if item in remove_list:
- continue
- two_list00.append(item)
- peak_list00 = []
- trough_list00 = []
- for item in two_list00:
- if item in peak_list:
- peak_list00.append(item)
- if item in trough_list:
- trough_list00.append(item)
- return peak_list00,trough_list00
-
- # 顶背离 和 底背离 索引对齐步骤
- def res_alignment_top_and_bottom_deviation_index(first_list,second_list):
- '''
- 顶背离 和 底背离
- 1. 对比时前后参差超过3,跳过
- :param first_list: 升序排列
- :param second_list: 升序排列
- :return:
- '''
- first_list00 = []
- second_list00 = []
- i = 0
- j = 0
- while i<len(first_list) and j<len(second_list):
- i_node = first_list[i]
- j0_node_list = []
- for j0 in range(j,len(second_list)):
- j0_node_list.append(abs(i_node-second_list[j0]))
- pass
- min_j0 = min(j0_node_list)
- if min_j0>3:
- i += 1
- continue
- min_index = j0_node_list.index(min_j0)
- ori_index = j + min_index
- first_list00.append(i_node)
- second_list00.append(second_list[ori_index])
- i += 1
- j = ori_index + 1
-
- return first_list00,second_list00
-
- def res_top_and_bottom_deviation(df,k_col_name_list,f_col_name_list):
- '''
- 顶背离 和 底背离
- :param df: 必要要有 i_row 列
- :param k_col_name_list: K线的[高点列,低点列]
- :param f_col_name_list: 指标的[高点列,低点列]
- :return:
- '''
- k_peak, k_trough = res_peak_trough_list(df, k_col_name_list[0], k_col_name_list[1])
- f_peak, f_trought = res_peak_trough_list(df, f_col_name_list[0], f_col_name_list[1])
- k_top, f_top = res_alignment_top_and_bottom_deviation_index(k_peak, f_peak)
- k_bottom, f_bottom = res_alignment_top_and_bottom_deviation_index(k_trough, f_trought)
-
- k_top_list = []
- f_top_list = []
- i = 0
- while i < len(k_top) - 2:
- one_k = k_top[i]
- two_k = k_top[i + 1]
- one_f = f_top[i]
- two_f = f_top[i + 1]
- one_k_val = df.iloc[one_k][k_col_name_list[0]]
- two_k_val = df.iloc[two_k][k_col_name_list[0]]
- one_f_val = df.iloc[one_f][f_col_name_list[0]]
- two_f_val = df.iloc[two_f][f_col_name_list[0]]
- if (one_k_val > two_k_val and one_f_val < two_f_val) or (one_k_val < two_k_val and one_f_val > two_f_val):
- k_top_list.append([(one_k, one_k_val), (two_k, two_k_val)])
- f_top_list.append([(one_f, one_f_val), (two_f, two_f_val)])
- i += 2
- pass
- else:
- i += 1
- pass
-
- k_bottom_list = []
- f_bottom_list = []
- i = 0
- while i < len(k_bottom) - 2:
- one_k = k_bottom[i]
- two_k = k_bottom[i + 1]
- one_f = f_bottom[i]
- two_f = f_bottom[i + 1]
- one_k_val = df.iloc[one_k][k_col_name_list[1]]
- two_k_val = df.iloc[two_k][k_col_name_list[1]]
- one_f_val = df.iloc[one_f][f_col_name_list[1]]
- two_f_val = df.iloc[two_f][f_col_name_list[1]]
- if (one_k_val > two_k_val and one_f_val < two_f_val) or (one_k_val < two_k_val and one_f_val > two_f_val):
- k_bottom_list.append([(one_k, one_k_val), (two_k, two_k_val)])
- f_bottom_list.append([(one_f, one_f_val), (two_f, two_f_val)])
- i += 2
- pass
- else:
- i += 1
- pass
-
- return k_top_list,f_top_list,k_bottom_list,f_bottom_list
需要导入的包、日期横坐标控件、K线图控件的代码请查看本人本栏目中其他博文,这里不再赘述。
本文以KDJ指标为例,KDJ显示控件
- class KDJ_PlotWidget(pg.PlotWidget):
- def __init__(self):
- super().__init__()
- self.setLabel('left', 'KDJ')
- xax = RotateAxisItem(orientation='bottom')
- xax.setHeight(h=60)
- self.setAxisItems({'bottom':xax})
-
- self.color_k = (255,255,0) # 纯黄
- self.color_d = (30,144,255) # 道奇蓝
- self.color_j = (255,0,255) # 紫红色
-
- self.color_white = (248,248,255)
- pass
- def set_data(self,data:Dict[str,Any]):
- df = data['df']
- df.reset_index(inplace=True)
- tradeDate_list = df['tradeDate'].values.tolist()
- x = range(len(df))
- xTick_show = []
- x_dur = math.ceil(len(df)/20)
- for i in range(0,len(df),x_dur):
- xTick_show.append((i,tradeDate_list[i]))
- if len(df)%20!=0:
- xTick_show.append((len(df)-1,tradeDate_list[-1]))
-
- self.clear()
- self.addLegend()
- xax = self.getAxis('bottom')
- xax.setTicks([xTick_show])
-
- kdjk_fixed_target = pg.PlotCurveItem(x=np.array(x),y=np.array(df['kdj_k'].values.tolist()),name='K',pen=pg.mkPen({'color':self.color_k,'width':1}),connect='finite')
- kdjd_fixed_target = pg.PlotCurveItem(x=np.array(x),y=np.array(df['kdj_d'].values.tolist()),name='D',pen=pg.mkPen({'color':self.color_d,'width':1}),connect='finite')
- kdjj_fixed_target = pg.PlotCurveItem(x=np.array(x),y=np.array(df['kdj_j'].values.tolist()),name='J',pen=pg.mkPen({'color':self.color_j,'width':1}),connect='finite')
-
- self.addItem(kdjk_fixed_target)
- self.addItem(kdjd_fixed_target)
- self.addItem(kdjj_fixed_target)
-
- line_one_target = pg.InfiniteLine(pos=(0, 0), movable=False, angle=0,
- pen=pg.mkPen({'color': self.color_white, 'width': 1}),
- label='0',
- labelOpts=({'position':0.05})
- )
- line_two_target = pg.InfiniteLine(pos=(0, 20), movable=False, angle=0,
- pen=pg.mkPen({'color': self.color_white, 'width': 1}),
- label='20',
- labelOpts=({'position': 0.05})
- )
-
- line_three_target = pg.InfiniteLine(pos=(0, 50), movable=False, angle=0,
- pen=pg.mkPen({'color': self.color_white, 'width': 1}),
- label='50',
- labelOpts=({'position': 0.05})
- )
- line_four_target = pg.InfiniteLine(pos=(0, 80), movable=False, angle=0,
- pen=pg.mkPen({'color': self.color_white, 'width': 1}),
- label='80',
- labelOpts=({'position': 0.05})
- )
- self.addItem(line_one_target)
- self.addItem(line_two_target)
- self.addItem(line_three_target)
- self.addItem(line_four_target)
-
- self.enableAutoRange()
- pass
结果显示控件
- class PyQtGraphScrollKWidget(QtWidgets.QWidget):
- def __init__(self):
- super().__init__()
- self.factor_widgets = {}
- self.init_data()
- self.init_ui()
- pass
- def init_data(self):
- # https://www.sioe.cn/yingyong/yanse-rgb-16/
- # self.color_line = (30, 144, 255)
- self.color_line = (255, 255, 0)
- self.color_highligh = (220,20,60)
- # 0 幽灵的白色; 1 纯黄; 2 紫红色; 3 纯绿; 4 道奇蓝
- self.color_list = [(248, 248, 255), (255, 255, 0), (255, 0, 255), (0, 128, 0), (30, 144, 255)]
- self.color_green = (34,139,34)
- self.color_red = (220,20,60)
- self.main_fixed_target_list = [] # 主体固定曲线,不能被删除
- self.whole_df = None
- self.whole_header = None
- self.whole_pd_header = None
- self.current_whole_data = None
- self.current_whole_df = None
- self.factor_list = None
- self.ma_list = None
- self.show_duration = []
- self.show_line = []
- self.show_segment = []
- self.detail_map = None
-
- self.factor_code_widgetname_map = {
- 'KDJ':'KDJ_PlotWidget'
- }
- pass
- def init_ui(self):
- self.whole_duration_label = QtWidgets.QLabel('左边界~右边界')
- pic_download_btn = QtWidgets.QPushButton('滚动截图')
- pic_download_btn.clicked.connect(self.pic_download_btn_clicked)
- layout_top = QtWidgets.QHBoxLayout()
- layout_top.addWidget(self.whole_duration_label)
- layout_top.addStretch(1)
- layout_top.addWidget(pic_download_btn)
-
- self.title_label = QtWidgets.QLabel('执行过程查看')
- self.title_label.setAlignment(Qt.AlignCenter)
- self.title_label.setStyleSheet('QLabel{font-size:18px;font-weight:bold}')
-
- # 滚动区域开始
- self.pw_layout = QtWidgets.QVBoxLayout()
- self.scroll_area = QtWidgets.QScrollArea()
- self.scroll_area.setWidgetResizable(True)
- # self.scroll_area.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAlwaysOff)
-
- layout_right = QtWidgets.QVBoxLayout()
- layout_right.addWidget(self.title_label)
- layout_right.addLayout(layout_top)
- layout_right.addWidget(self.scroll_area)
- self.setLayout(layout_right)
- pass
-
- def set_data(self, data: Dict[str, Any]):
- title_str = data['title_str']
- whole_header = data['whole_header']
- whole_df = data['whole_df']
- whole_pd_header = data['whole_pd_header']
- factor_list = data['factor_list']
- ma_list = data['ma_list']
- detail_map = data['detail_map']
-
-
- self.whole_header = whole_header
- self.whole_df = whole_df
- self.whole_pd_header = whole_pd_header
- self.factor_list = factor_list
- self.ma_list = ma_list
- if detail_map.get('duration') is not None:
- self.show_duration = detail_map['duration']
- if detail_map.get('line') is not None:
- self.show_line = detail_map['line']
- if detail_map.get('segment') is not None:
- self.show_segment = detail_map['segment']
- self.detail_map = detail_map
-
-
- self.title_label.setText(title_str)
- self.whole_duration_label.setText(f"{self.whole_df.iloc[0]['tradeDate']}~{self.whole_df.iloc[-1]['tradeDate']}")
-
- self.current_whole_df = self.whole_df.copy()
- self.caculate_and_show_data()
- pass
-
- def caculate_and_show_data(self):
- df = self.current_whole_df.copy()
- # df.reset_index(inplace=True)
- df['i_count'] = [i for i in range(len(df))]
- tradeDate_list = df['tradeDate'].values.tolist()
- x = range(len(df))
- xTick_show = []
- x_dur = math.ceil(len(df) / 20)
- for i in range(0, len(df), x_dur):
- xTick_show.append((i, tradeDate_list[i]))
- if len(df) % 20 != 0:
- xTick_show.append((len(df) - 1, tradeDate_list[-1]))
- candle_data = []
- for i, row in df.iterrows():
- candle_data.append(
- (row['i_count'], row['openPrice'], row['closePrice'], row['lowestPrice'], row['highestPrice']))
-
- self.current_whole_data = df.loc[:, self.whole_pd_header].values.tolist()
- # 开始配置显示的内容
- self.create_candle_widget()
- self.factor_widgets.clear()
-
- xax = self.pw.getAxis('bottom')
- xax.setTicks([xTick_show])
-
- # 标记技术图形 start
- if len(self.show_duration)>0:
- duration_list = self.show_duration[0]
- duration_color = self.show_duration[1]
- for i,item in enumerate(duration_list):
- for item00 in item:
- signal_fiexed_target = pg.LinearRegionItem([item00[0], item00[1]],
- movable=False, brush=(
- self.color_list[duration_color[i]][0], self.color_list[duration_color[i]][1], self.color_list[duration_color[i]][2], 50))
- self.pw.addItem(signal_fiexed_target)
- pass
- pass
- if len(self.show_line)>0:
- line_list = self.show_line[0]
- line_color = self.show_line[1]
- for i,item in enumerate(line_list):
- for item00 in item:
- signal_fiexed_target = pg.InfiniteLine(pos=(item00, 0), movable=False, angle=90,
- pen=pg.mkPen({'color': self.color_list[line_color[i]], 'width': 1})
- )
- self.pw.addItem(signal_fiexed_target)
- pass
- if len(self.show_segment)>0:
- for item in self.show_segment:
- r_target = pg.LineSegmentROI(item,pen={'color':self.color_red,'width':2},movable=False)
- self.pw.addItem(r_target)
- pass
- # 标记技术图形 end
-
- candle_fixed_target = CandlestickItem(candle_data)
- self.main_fixed_target_list.append(candle_fixed_target)
- self.pw.addItem(candle_fixed_target)
-
- if len(self.ma_list)>0:
- for i, item in enumerate(self.ma_list):
- item_color = i%len(self.color_list)
- line_fixed_target = pg.PlotCurveItem(x=np.array(x), y=np.array(df[item].values.tolist()),
- pen=pg.mkPen({'color': self.color_list[item_color],
- 'width': 1}),
- connect='finite')
- self.main_fixed_target_list.append(line_fixed_target)
- self.pw.addItem(line_fixed_target)
- pass
-
- self.vLine = pg.InfiniteLine(angle=90, movable=False)
- self.hLine = pg.InfiniteLine(angle=0, movable=False)
- self.label = pg.TextItem()
-
- self.pw.addItem(self.vLine, ignoreBounds=True)
- self.pw.addItem(self.hLine, ignoreBounds=True)
- self.pw.addItem(self.label, ignoreBounds=True)
-
- self.vb = self.pw.getViewBox()
- self.proxy = pg.SignalProxy(self.pw.scene().sigMouseMoved, rateLimit=60, slot=self.mouseMoved)
-
- # 其他项
- for item in self.factor_list:
- item_widget = eval(self.factor_code_widgetname_map[item])()
- item_widget.setMinimumHeight(300)
- item_widget.set_data({'df':self.current_whole_df.copy()})
- item_widget.setXLink(self.pw)
- # 标记 start
- if self.detail_map.get(item) is not None:
- target_detail = self.detail_map[item]
- if target_detail.get('duration') is not None:
- target_duration = target_detail['duration']
- target_list = target_duration[0]
- target_color = target_duration[1]
- for i,i0 in enumerate(target_list):
- for i00 in i0:
- signal_fiexed_target = pg.LinearRegionItem([i00[0], i00[1]],
- movable=False, brush=(
- self.color_list[target_color[i]][0], self.color_list[target_color[i]][1],
- self.color_list[target_color[i]][2], 50))
- item_widget.addItem(signal_fiexed_target)
- pass
- if target_detail.get('line') is not None:
- target_line = target_detail['line']
- target_list = target_line[0]
- target_color = target_line[1]
- for i, i0 in enumerate(target_list):
- for i00 in i0:
- signal_fiexed_target = pg.InfiniteLine(pos=(i00, 0), movable=False, angle=90,
- pen=pg.mkPen(
- {'color': self.color_list[target_color[i]],
- 'width': 1})
- )
- item_widget.addItem(signal_fiexed_target)
- pass
- if target_detail.get('segment') is not None:
- target_segment = target_detail['segment']
- for i00 in target_segment:
- r_target = pg.LineSegmentROI(i00, pen={'color': self.color_red, 'width': 2}, movable=False)
- item_widget.addItem(r_target)
- pass
- # 标记 end
- self.factor_widgets[item] = item_widget
- self.fill_pw_widget()
- self.pw.enableAutoRange()
- pass
-
- def mouseMoved(self, evt):
- pos = evt[0]
- if self.pw.sceneBoundingRect().contains(pos):
- mousePoint = self.vb.mapSceneToView(pos)
- index = int(mousePoint.x())
- if index >= 0 and index < len(self.current_whole_data):
- target_data = self.current_whole_data[index]
- html_str = ''
- for i, item in enumerate(self.whole_header):
- html_str += f"
{item}:{target_data[i]}" - self.label.setHtml(html_str)
- self.label.setPos(mousePoint.x(), mousePoint.y())
- self.vLine.setPos(mousePoint.x())
- self.hLine.setPos(mousePoint.y())
- pass
-
- def mouseClicked(self, evt):
- pass
-
- def updateViews(self):
- pass
-
- def fill_pw_widget(self):
- # 清空控件
- while self.pw_layout.count():
- item = self.pw_layout.takeAt(0)
- widget = item.widget()
- if widget is not None:
- widget.deleteLater()
- pass
- pass
- sc_child_widget = self.scroll_area.takeWidget()
- if sc_child_widget is not None:
- sc_child_widget.deleteLater()
- self.pw_layout.addWidget(self.pw)
- for item in self.factor_widgets.values():
- self.pw_layout.addWidget(item)
- one_sc_child_widget = QtWidgets.QWidget()
- one_sc_child_widget.setLayout(self.pw_layout)
- self.scroll_area.setWidget(one_sc_child_widget)
- pass
-
- def create_candle_widget(self):
- xax = RotateAxisItem(orientation='bottom')
- xax.setHeight(h=60)
- self.pw = pg.PlotWidget(axisItems={'bottom': xax})
- self.pw.setMinimumHeight(500)
- self.pw.setMouseEnabled(x=True, y=True)
- # self.pw.enableAutoRange(x=False,y=True)
- self.pw.setAutoVisible(x=False, y=True)
- pass
-
- def pic_download_btn_clicked(self):
- now_str = datetime.now().strftime('%Y%m%d%H%M%S')
- path,_ = QtWidgets.QFileDialog.getSaveFileName(
- self,
- '选择图片保存路径',
- f"pic_{now_str}",
- 'JPG(*.jpg)'
- )
- if not path:
- return
-
- widget = self.scroll_area.widget()
- pix = widget.grab()
- pix.save(path)
- pass
-
- pass
- if __name__ == '__main__':
- pre_path = r'D:/temp005/600660.csv'
- caculate_start_date_str = '2020-01-01'
- df = pd.read_csv(pre_path,encoding='utf-8')
- # 删除停牌的数据
- df = df.loc[df['openPrice'] > 0].copy()
- df['o_date'] = df['tradeDate']
- df['o_date'] = pd.to_datetime(df['o_date'])
- df = df.loc[df['o_date'] >= caculate_start_date_str].copy()
- # 保存未复权收盘价数据
- df['close'] = df['closePrice']
- # 计算前复权数据
- df['openPrice'] = df['openPrice'] * df['accumAdjFactor']
- df['closePrice'] = df['closePrice'] * df['accumAdjFactor']
- df['highestPrice'] = df['highestPrice'] * df['accumAdjFactor']
- df['lowestPrice'] = df['lowestPrice'] * df['accumAdjFactor']
-
- df.reset_index(inplace=True)
- df['i_row'] = [i for i in range(len(df))]
-
- factor_list = ['KDJ']
- df['kdj_k'], df['kdj_d'] = talib.STOCH(df['highestPrice'], df['lowestPrice'], df['closePrice'], fastk_period=5,
- slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0)
- df['kdj_j'] = 3 * df['kdj_k'] - 2 * df['kdj_d']
-
- k_top_list, f_top_list, k_bottom_list, f_bottom_list = res_top_and_bottom_deviation(df,['highestPrice','lowestPrice'],['kdj_k','kdj_k'])
-
- k_deviation_list = k_top_list + k_bottom_list
- f_deviation_list = f_top_list + f_bottom_list
- detail_map = {
- 'segment': k_deviation_list,
- 'KDJ': {
- 'segment': f_deviation_list
- }
- }
-
- columns_list = ['日期', '收盘价', '开盘价', '最高价', '最低价']
- columns_pd_list = ['tradeDate', 'closePrice', 'openPrice', 'highestPrice', 'lowestPrice']
- line_data = {
- 'title_str': '寻找顶背离和底背离',
- 'whole_header': columns_list,
- 'whole_df': df,
- 'whole_pd_header': columns_pd_list,
- 'detail_map': detail_map,
- 'factor_list': factor_list,
- 'ma_list': [],
- 'peak_trough_data': None
- }
-
- QtCore.QCoreApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)
- app = QtWidgets.QApplication(sys.argv)
- t_win = PyQtGraphScrollKWidget()
- t_win.set_data(line_data)
- t_win.showMaximized()
- app.exec()
- pass