• 数据采集: selenium 自动翻页接口调用时的验证码处理


    写在前面


    • 工作中遇到,简单整理
    • 理解不足小伙伴帮忙指正

    对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


    数据采集的过程中,有部分页面会在接口调用一定次数之后,每次获取数据调用接口之后,弹出一个验证码的校验,作为一种反爬措施,对于这种接口调用验证码,一般情况下,是只要请求就跳转,有部分页面是随机的,比如页面中有好多搜索框,可能每个搜索框的change 事件都会发生一次接口调用,这个时候使用 selenium 自动化提提取数据,会导致处理的页面不是想要的的页面,所以对于这种验证码的处理,我们需要在页面任意位置,提供一个检测跳转验证码验证页面的方法,同时对验证码做校验处理。

    下面为一个 Demo

    def cap(driver):
        """
        @Time    :   2023/08/29 03:38:33
        @Author  :   liruilonger@gmail.com
        @Version :   1.0
        @Desc    :   验证码处理
                     Args:
                       driver
                     Returns:
                       void
        """
        import ddddocr
        ocr = ddddocr.DdddOcr()
        time.sleep(3)
        while  len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
                element = driver.find_element(By.XPATH, "//img[ @id='vcodeimg' ]")
                # 清空验证码数据
                driver.execute_script("arguments[0].value = ''", element)
                #定位元素并获取截图文件: 
                element.screenshot("element.png")
                with open("element.png", "rb") as f:
                    image_bytes = f.read()
    
                #image_bytes = BytesIO(base64.b64decode(screenshot))
                text = ocr.classification(image_bytes)
                if len(text) >4:
                    text = text[1:5]
                driver.find_element(By.XPATH, "//input[@id='vcode']").send_keys(text)
                time.sleep(3)
                driver.find_element(By.XPATH, "//input[@class='isOK']").click()
                time.sleep(3)
                # 验证失败重新验证
                if  len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
                    driver.get("https://icp.chinaz.com/captcha")
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在实际的编写中需要注意的地方:

    • 获取验证码图片的方式,是通过对元素截图,还是对照片路径请求下载获取,需要注意有些验证码图片,在通过 requests 库下载图片时,每次调用都是不同的图片,所以只能使用截图的方式
    • 验证码识别的方式,可以考虑使用 ocr或者深度学习模型,或者一些商业接口,上面使用的 pip install ddddocr
    • 对于识别不准的情况,可以考虑做一些后期的约束处理,比如上面的验证码,4位数字,但是在第一位会出现一个干扰字符,ocr 偶尔会识别为字符,需要做切割处理。
    • 进行识别的时机,以及识别后的处理,对于如何开始识别,可以通过关键字来进行判断,放到入口处,对于识别后验证失败的处理也需要考虑,上面的页面在识别验证成功会进行跳转,错了不发生跳转
    • 对于错误的情况,可以使用死循环的,重新请求,获取新的验证码,直到识别验证成功。

    下面为一个数据采集的实际脚本中的使用。

    #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    """
    @File    :   icp_reptile.py
    @Time    :   2023/08/23 23:07:46
    @Author  :   Li Ruilong
    @Version :   1.0
    @Contact :   liruilonger@gmail.com
    @Desc    :   验证码版本
    """
    
    # here put the import lib
    
    
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.action_chains import ActionChains
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    import time
    import re
    import pandas as pd
    import csv
    import sys
    import os 
    import json
    import requests
    import ddddocr
    from io import BytesIO
    import base64
    import pytesseract
    from PIL import Image
    
    
    a_name = ['河北']
    ocr = ddddocr.DdddOcr()
    
    
    """
    自动登陆,需要提前保存登陆cookie 信息
    """
    driver = webdriver.Chrome()
    with open('C:\\Users\山河已无恙\\Documents\GitHub\\reptile_demo\\demo\\cookie_vip.json', 'r', encoding='u8') as f:
        cookies = json.load(f)
    
    driver.get('https://icp.chinaz.com/provinces')
    for cookie in cookies:
        driver.add_cookie(cookie)
    
    driver.get('https://icp.chinaz.com/provinces')
    
    wait = WebDriverWait(driver, 30)
    
    
    ## 查询条件准备
    
    """
    查询条件准备
    """
    
    #wait.until(EC.presence_of_element_located((By.XPATH, "//span[ @title='chinaz_7052291' ]")))
    
    
    def cap(driver):
        """
        @Time    :   2023/08/29 03:38:33
        @Author  :   liruilonger@gmail.com
        @Version :   1.0
        @Desc    :   验证码处理
                     Args:
                       
                     Returns:
                       void
        """
        time.sleep(3)
        while  len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
                element = driver.find_element(By.XPATH, "//img[ @id='vcodeimg' ]")
                # 清空验证码数据
                driver.execute_script("arguments[0].value = ''", element)
    
    
                #定位元素并获取截图文件: 
                element.screenshot("element.png")
                with open("element.png", "rb") as f:
                    image_bytes = f.read()
    
                #image_bytes = BytesIO(base64.b64decode(screenshot))
                text = ocr.classification(image_bytes)
                if len(text) >4:
                    text = text[1:5]
                driver.find_element(By.XPATH, "//input[@id='vcode']").send_keys(text)
                time.sleep(3)
                driver.find_element(By.XPATH, "//input[@class='isOK']").click()
                time.sleep(3)
                if  len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
                    driver.get("https://icp.chinaz.com/captcha")
    
    
    
    time.sleep(5)
    # 触发验证码处理
    all_butt_cap =  driver.find_element(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )
    
    # 处理验证码的情况
    
    cap(driver)
    
    time.sleep(5)
    
    ### 查询条件准备
    
    # 备案时间
    all_butt =  driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @class='pr10' ] " )
    driver.execute_script("arguments[0].click();", all_butt)
    cap(driver)
    # 单位性质 
    all_butt =  driver.find_element(By.XPATH,"//div[contains(text(),'全部') and @class='MainCateW-cont SearChoese'] " )
    driver.execute_script("arguments[0].click();", all_butt)
    time.sleep(3)
    cap(driver)
    # 企业
    all_butt =  driver.find_element(By.XPATH,"//a[contains(text(),'企业') and @val='企业' ]" )
    driver.execute_script("arguments[0].click();", all_butt)
    time.sleep(2)
    
    cap(driver)
    # 状态
    all_butt =  driver.find_element(By.XPATH,"//div[contains(text(),'全部') and @id='webStatus_txt'  and @class='MainCateW-cont SearChoese w90'] " )
    
    driver.execute_script("arguments[0].click();", all_butt)
    time.sleep(2)
    # 已开通
    all_butt =  driver.find_element(By.XPATH,"//a[contains(text(),'已开通') and @val='1'  ]" )
    
    driver.execute_script("arguments[0].click();", all_butt)
    time.sleep(2)
    cap(driver)
    
    # 地区
    all_butt =  driver.find_element(By.XPATH,"//strong[contains(text(),'地区:') and @class='CateTit'  ]" )
    next_element = all_butt.find_element(By.XPATH,"following-sibling::*[1]")
    driver.execute_script("arguments[0].click();", next_element)
    time.sleep(2)
    
    
    
    def area(p_name,driver,p_data):
        """
        @Time    :   2023/08/24 04:24:50
        @Author  :   liruilonger@gmail.com
        @Version :   1.0
        @Desc    :   备案数据获取
                     Args:
                       
                     Returns:
                       void
        """
    
       
        all_butt =  driver.find_element(By.XPATH,"//a[contains(text(),'"+p_name+"')   ]" )
        driver.execute_script("arguments[0].click();", all_butt)
        #all_butt.click()
        time.sleep(2)
    
        # 页数太对分盟市处理,后面的数据没办法直接处理
        all_butt =  driver.find_element(By.XPATH,"//div[contains(text(),'全部')  and @id='addrctxt'  ]" )
        time.sleep(2)
        driver.execute_script("arguments[0].click();", all_butt)
        all_ui =  driver.find_element(By.XPATH,"//ul[ @id='addrclst'] " )
        citys =  all_ui.find_elements(By.TAG_NAME,'a')
        for city in  citys:
            try:
                c_n =city.text
            except:
                print("页面异常")
                continue
            
            if c_n == '全部':
                continue
            print("处理市:",c_n)
            time.sleep(2)
            driver.execute_script("arguments[0].click();", city)
            time.sleep(5)
            # 验证码处理
            cap(driver)
    
            time.sleep(5)
             # 查询
            cap(driver)
            # 选择全部
            all_butt =  driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @val='all'  ] " )
            driver.execute_script("arguments[0].click();", all_butt)
            time.sleep(5)
            cap(driver)
            #all_butt =  driver.find_element(By.XPATH,"//input[ @type='button' and @id='btn_search'  and @value='点击搜索'] " )
            #driver.execute_script("arguments[0].click();", all_butt)
            #time.sleep(4)
            #cap(driver)
            time.sleep(10)
            def all_break(driver):
                if len(driver.find_elements(By.XPATH,"//span[contains(text(),'页,到第')  and @class='col-gray02'] " )) == 0:
                    all_butt =  driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @val='all'  ] " )
                    driver.execute_script("arguments[0].click();", all_butt)
                    time.sleep(5)
                    cap(driver)
                    time.sleep(10)
            #all_break(driver)
            # 总页数获取
            page_butt =  driver.find_element(By.XPATH,"//span[contains(text(),'页,到第')  and @class='col-gray02'] " )
    
            page_c  = int(re.search(r'\d+', page_butt.text).group())
            # 盟市页数太多分时间段处理 
    
            # 当前页数据处理
            tbody =  driver.find_element(By.XPATH,"//tbody[ @class='result_table' and @id='result_table' ]")
            rows = tbody.find_elements(By.TAG_NAME,'tr')
            for row in rows:
            # 获取当前行中的所有单元格
                cells = row.find_elements(By.TAG_NAME, "td")
                # 打印单元格数据
                data = {}
                data['域名']=cells[0].text
                data['主办单位名称']=cells[1].text
                data['网站首页网址']=cells[5].text
                p_data.append(data)
    
            # 其他页数据处理 
            if page_c >= 101:
               page_c = 101 
            for page_i in range(1,page_c):
                try:
                    print(f"{c_n} :处理页数",page_i)
                    nextPage =  driver.find_element(By.XPATH,"//a[ @title='下一页' and @id='nextPage' ]")
                    driver.execute_script("arguments[0].click();", nextPage)
                    time.sleep(3)
                    cap(driver)
                    all_break(driver)
                    cap(driver)
                    #nextPage.click()
                    time.sleep(6)
                    tbody =  driver.find_element(By.XPATH,"//tbody[ @class='result_table' and @id='result_table' ]")
                    rows = tbody.find_elements(By.TAG_NAME,'tr')
                    print(tbody.text)
                    for row in rows:
                    # 获取当前行中的所有单元格
                        cells = row.find_elements(By.TAG_NAME, "td")
                        # 打印单元格数据
                        data = {}
                        data['域名']=cells[0].text
                        data['主办单位名称']=cells[1].text
                        data['网站首页网址']=cells[5].text
                        p_data.append(data)
                    time.sleep(6)
                
                except:
                    print(f"第 { page_i} 页发生了异常,跳过了")
                    pass 
                finally: 
                    fieldnames = ['域名', '主办单位名称', '网站首页网址']
                    with open('省份_'+a+'_'+"ICP"+'.csv', 'w', newline='',encoding='utf-8') as file:
                        writer = csv.DictWriter(file, fieldnames=fieldnames)
                        writer.writeheader()  # 写入列名
                        writer.writerows(p_data)  # 写入字典数据
            
            print("数据已保存为CSV文件",'  CDN_M_省份_'+a+'_'+'ICP'+'.csv')        
        return p_data        
    
    if __name__ == '__main__':
        for  a in a_name:
            p_data= []
            try:
                p_data = area(a,driver,p_data)
            except:
                continue
                pass
            finally:
                fieldnames = ['域名', '主办单位名称', '网站首页网址']
                with open('省份_'+a+'_'+"ICP"+'.csv', 'w', newline='',encoding='utf-8') as file:
                    writer = csv.DictWriter(file, fieldnames=fieldnames)
                    writer.writeheader()  # 写入列名
                    writer.writerows(p_data)  # 写入字典数据
                    print("数据已保存为CSV文件",'  CDN_M_省份_'+a+'_'+'ICP'+'.csv')  
    
        time.sleep(55555)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286

    博文部分内容参考

    © 文中涉及参考链接内容版权归原作者所有,如有侵权请告知 😃



    © 2018-2023 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

  • 相关阅读:
    R语言【geometry】——convhulln():计算包含一组点的最小穹顶
    Java基础接口
    基于SpringBoot+RabbitMQ+Redis开发的秒杀系统(异步下单、热点数据缓存、解决超卖)
    .Net CLR
    工业楼控暖通组态恒温检测控制大屏前端UI案例
    【整整600集】python编程零基础自学教程,清华大佬196小时讲完的,全程干货无废话
    手把手教你清除多御安全浏览器的Cookie
    超级计算/先进计算的十大用途
    数据结构——优先级队列(堆)
    OpenVoiceV2本地部署教程,苹果MacOs部署流程,声音响度统一,文字转语音,TTS
  • 原文地址:https://blog.csdn.net/sanhewuyang/article/details/132228243