• Beaustiful Soup爬虫案例


    1 第三方库

    • 获取随机UA
    pip install fake-useragent
    
    • 1
    • 连接数据库
    $ pip3 install PyMySQL
    
    • 1
    • 发起请求
    pip install requests
    
    • 1
    • 解析页面
    pip install beautifulsoup4
    
    • 1
    • 进度条
    pip install tqdm
    
    • 1

    2 爬取

    2.1 初始化函数

    • 新建爬虫类
    class mySpider:
    
    • 1
    • 创建数据库连接和初始化url
    # 初始化url
        def __init__(self, url):
            self.url = url
            # 计数,请求一个页面的次数,初始值为1
            self.count = 1
            # 数据库连接对象
            self.db = pymysql.connect(
                host='localhost',
                port=3306,
                user='root',
                password='123456',
                database='test')
            # 创建游标对象
            self.cursor = self.db.cursor()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.2 结束时关闭数据库

    • 关闭数据库释放资源,方法运行完后调用。
    # 结束断开数据库连接
        def __del__(self):
            self.cursor.close()
            self.db.close()
            print("关闭数据库!")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.3 生成header

    • 使用第三方库fake-useragent生成随机UA
    # 获取一个header
        def getHeader(self):
            # 实例化ua对象
            ua = UserAgent()
            # 随机获取一个ua
            headers = {'User-Agent': ua.random}
            return headers
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.4 获取请求body

    • 注意有返回值的递归,要把返回值返回,回调时加return
    def getBody(self, url, send_type, data):
            # 每次请求都随机停顿一些时间
            # time.sleep(random.randint(1, 2))
            # 在超时时间内,对于失败页面尝试请求三次
            if self.count <= 3:
                try:
                    if send_type == 'get':
                        res = requests.get(url=url, headers=self.getHeader(), params=data, timeout=2)
                    elif send_type == 'post':
                        res = requests.post(url=url, headers=self.getHeader(), data=data, timeout=2)
                    else:
                        print("未输入send_type,直接返回None")
                        res = None
                    return res
                except Exception as e:
                    print(e)
                    self.count += 1
                    print(f"第{self.count}次,发起请求")
                    # 再次调用自己,并把值返回,(注意要加return)
                    return self.getBody(url, send_type, data)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.5 解析异步json数据

    • 解析异步json数据
    def parseData(self, dataList):
            # 循环查看详情
            for row in tqdm(dataList, desc='爬取进度'):
                # 请求详情页url
                urlDetail = f"https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle04&compId={row['companyId']}"
                # 发起请求
                # 每次请求都初始化一次self.count
                self.count = 1
                res = self.getBody(url=urlDetail, send_type='get', data={})
                if res is not None:
                    # 解析html
                    self.parseHtml(row=row, htmlText=res.text)
                else:
                    print(f"{urlDetail}请求失败!")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.6 使用BS4的find方法解析

    • find_all() 方法用来搜索当前 tag 的所有子节点,并判断这些节点是否符合过滤条件,最后以列表形式将符合条件的内容返回,语法格式如下
      find_all( name , attrs , recursive , text , limit )
    • 参数说明
    • name:查找所有名字为 name 的 tag 标签,字符串对象会被自动忽略。
    • attrs:按照属性名和属性值搜索 tag 标签,注意由于 class 是 Python 的关键字吗,所以要使用 “class_”。
    • recursive:find_all() 会搜索 tag 的所有子孙节点,设置 recursive=False 可以只搜索 tag 的直接子节点。
    • text:用来搜文档中的字符串内容,该参数可以接受字符串 、正则表达式 、列表、True。
    • limit:由于 find_all() 会返回所有的搜索结果,这样会影响执行效率,通过 limit 参数可以限制返回结果的数量。
    def parseHtml(self, row, htmlText):
            soup = BeautifulSoup(htmlText, 'html.parser')
            # 获取备案信息
            divList = soup.find_all('div', class_=['col-md-8'])
            divtextList = [re.sub(r'\s+', '', div.text) for div in divList]
    
            # 获取其他机构地址
            divListOther = soup.find_all('div', class_=['col-sm-8'])
            divtextListOther = [re.sub(r'\s+', '', div.text) for div in divListOther]
            otherOrgAdd = ','.join(divtextListOther)
    
            # 插入数据库
            companyId = row['companyId']
            linkTel = row['linkTel']
            recordNo = row['recordNo']
            areaName = row['areaName']
            linkMan = row['linkMan']
            address = row['address']
            compName = row['compName']
            recordStatus = row['recordStatus']
            cancelRecordTime = row.get('cancelRecordTime', '')
            compLevel = divtextList[2]
            recordTime = divtextList[6]
            sql1 = "insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) "
            sql2 = f"values('{companyId}','{areaName}','{recordNo}','{compName}','{address}','{linkMan}','{linkTel}','{recordStatus}','{compLevel}','{recordTime}','{cancelRecordTime}','{otherOrgAdd}')"
            sql3 = sql1 + sql2
            # 执行sql
            self.cursor.execute(sql3)
            # 提交
            self.db.commit()
    
            # 获取备案专业和主要研究者信息
            tbody = soup.find('tbody')
            trList = tbody.find_all('tr')
            # 对tr循环获取td
            for tr in trList:
                tdList = tr.find_all('td')
                tdTextList = [td.text for td in tdList]
                tdTextList.insert(0, companyId)
                # print(tdTextList)
                # 插入数据库
                sql4 = "insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)"
                self.cursor.execute(sql4, tdTextList)
                # 提交到数据库
                self.db.commit()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    2.7 写入口函数

    • 这里pageSize直接干到最大,懂的都懂!
    def run(self):
            try:
                # 拿第一页的数据
                data = {'pageSize': 1350, 'curPage': 1}
                # 每次请求都初始化一次self.count
                self.count = 1
                res = self.getBody(url=self.url, send_type='post', data=data)
                if res is not None:
                    # 加载为json
                    jsonRes = json.loads(res.text)
                    # 查看响应状态码
                    status = jsonRes['success']
                    # 如果状态为True
                    if status == True:
                        # 获取数据
                        dataList = jsonRes['data']
                        # 处理数据
                        self.parseData(dataList=dataList)
                else:
                    print(f"{self.url}请求失败")
            except Exception as e:
                print('发生错误!', e)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.8 调用

    • 调用
    if __name__ == '__main__':
        spider = mySpider('https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle05')
        spider.run()
    
    • 1
    • 2
    • 3

    3 完整代码

    • 完整代码
    import requests
    from bs4 import BeautifulSoup
    from fake_useragent import UserAgent
    import time
    import random
    import json
    import re
    import pymysql
    from tqdm import tqdm
    
    class mySpider:
    
        # 初始化url
        def __init__(self, url):
            self.url = url
            # 计数,请求一个页面的次数,初始值为1
            self.count = 1
            # 数据库连接对象
            self.db = pymysql.connect(
                host='localhost',
                port=3306,
                user='root',
                password='logicfeng',
                database='test2')
            # 创建游标对象
            self.cursor = self.db.cursor()
    
        # 结束断开数据库连接
        def __del__(self):
            self.cursor.close()
            self.db.close()
            print("关闭数据库!")
    
        # 获取一个header
        def getHeader(self):
            # 实例化ua对象
            ua = UserAgent()
            # 随机获取一个ua
            headers = {'User-Agent': ua.random}
            return headers
    
        # 获取请求body
        def getBody(self, url, send_type, data):
            # 每次请求都随机停顿一些时间
            # time.sleep(random.randint(1, 2))
            # 在超时时间内,对于失败页面尝试请求三次
            if self.count <= 3:
                try:
                    if send_type == 'get':
                        res = requests.get(url=url, headers=self.getHeader(), params=data, timeout=2)
                    elif send_type == 'post':
                        res = requests.post(url=url, headers=self.getHeader(), data=data, timeout=2)
                    else:
                        print("未输入send_type,直接返回None")
                        res = None
                    return res
                except Exception as e:
                    print(e)
                    self.count += 1
                    print(f"第{self.count}次,发起请求")
                    # 再次调用自己,并把值返回,(注意要加return)
                    return self.getBody(url, send_type, data)
    
        # 解析body
        def parseData(self, dataList):
            # 循环查看详情
            for row in tqdm(dataList, desc='爬取进度'):
                # 请求详情页url
                urlDetail = f"https://www.baidu.com/CTMDS/pub/PUB010100.do?method=handle04&compId={row['companyId']}"
                # 发起请求
                # 每次请求都初始化一次self.count
                self.count = 1
                res = self.getBody(url=urlDetail, send_type='get', data={})
                if res is not None:
                    # 解析html
                    self.parseHtml(row=row, htmlText=res.text)
                else:
                    print(f"{urlDetail}请求失败!")
    
        # 解析页面
        def parseHtml(self, row, htmlText):
            soup = BeautifulSoup(htmlText, 'html.parser')
            # 获取备案信息
            divList = soup.find_all('div', class_=['col-md-8'])
            divtextList = [re.sub(r'\s+', '', div.text) for div in divList]
    
            # 获取其他机构地址
            divListOther = soup.find_all('div', class_=['col-sm-8'])
            divtextListOther = [re.sub(r'\s+', '', div.text) for div in divListOther]
            otherOrgAdd = ','.join(divtextListOther)
    
            # 插入数据库
            companyId = row['companyId']
            linkTel = row['linkTel']
            recordNo = row['recordNo']
            areaName = row['areaName']
            linkMan = row['linkMan']
            address = row['address']
            compName = row['compName']
            recordStatus = row['recordStatus']
            cancelRecordTime = row.get('cancelRecordTime', '')
            compLevel = divtextList[2]
            recordTime = divtextList[6]
            sql1 = "insert INTO medical_register(company_id,area_name,record_no,comp_name,address,link_man,link_tel,record_status,comp_level,record_time,cancel_record_time,other_org_add) "
            sql2 = f"values('{companyId}','{areaName}','{recordNo}','{compName}','{address}','{linkMan}','{linkTel}','{recordStatus}','{compLevel}','{recordTime}','{cancelRecordTime}','{otherOrgAdd}')"
            sql3 = sql1 + sql2
            # 执行sql
            self.cursor.execute(sql3)
            # 提交
            self.db.commit()
    
            # 获取备案专业和主要研究者信息
            tbody = soup.find('tbody')
            trList = tbody.find_all('tr')
            # 对tr循环获取td
            for tr in trList:
                tdList = tr.find_all('td')
                tdTextList = [td.text for td in tdList]
                tdTextList.insert(0, companyId)
                # print(tdTextList)
                # 插入数据库
                sql4 = "insert into medical_register_sub (company_id,professional_name,principal_investigator,job_title) values(%s,%s,%s,%s)"
                self.cursor.execute(sql4, tdTextList)
                # 提交到数据库
                self.db.commit()
    
        # 入口函数
        def run(self):
            try:
                # 拿第一页的数据
                data = {'pageSize': 1350, 'curPage': 1}
                # 每次请求都初始化一次self.count
                self.count = 1
                res = self.getBody(url=self.url, send_type='post', data=data)
                if res is not None:
                    # 加载为json
                    jsonRes = json.loads(res.text)
                    # 查看响应状态码
                    status = jsonRes['success']
                    # 如果状态为True
                    if status == True:
                        # 获取数据
                        dataList = jsonRes['data']
                        # 处理数据
                        self.parseData(dataList=dataList)
                else:
                    print(f"{self.url}请求失败")
            except Exception as e:
                print('发生错误!', e)
    
    
    if __name__ == '__main__':
        spider = mySpider('https://www.百度.com/CTMDS/pub/PUB010100.do?method=handle05')
        spider.run()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
  • 相关阅读:
    实验3
    ruoyi-vue版本框架(一)如何下载源码,并且在本地启动
    计算机毕业设计Java游戏资讯网站(系统+程序+mysql数据库+Lw文档)
    RabbitMQ的部署
    数据结构基础学习
    【Git】Gitbash使用ssh 上传本地项目到github
    Java数据结构-哈希表
    什么叫做信息安全?包含哪些内容?与网络安全有什么区别?
    数据可视化
    【Git】03-GitHub
  • 原文地址:https://blog.csdn.net/weixin_43684214/article/details/134196582