• 爬虫-打包整个小说网站


    使用Python爬取小说章节并存储到MongoDB数据库

    1. 背景介绍

    在本篇文章中,我们将展示如何使用Python爬取小说网站的章节内容并将其存储到MongoDB数据库中。我们会使用requests进行网页请求,lxml处理HTML,re进行正则表达式匹配,threading实现并发处理,以及pymongo连接MongoDB。

    真实情况是写课设没数据,获取取一点

    2. 环境和依赖

    请确保你的环境已经安装了以下依赖包:

    pip install requests lxml pymongo

    3. 代码详解

    下面是完整的代码实现。我们将按照功能模块进行解释。

    3.1 引入必要的库

    首先,我们需要引入所需的库:

    1. import requests
    2. from lxml import etree
    3. import re
    4. import threading
    5. import time
    6. from pymongo import MongoClient
    3.2 设置请求头

    为了防止被网站封禁,我们设置了请求头:

    1. headers = {
    2. 'User-Agent': '写你自己的'
    3. }
    3.3 初始化全局变量和信号量

    为了控制并发和存储章节内容,我们设置了全局变量和信号量:

    1. zhangjie_content = [] # 存储章节内容
    2. semaphore = threading.Semaphore(20) # 限制并发数量为20
    3.4 MongoDB连接设置

    连接到MongoDB数据库

    1. client = MongoClient('mongodb://localhost:27017/')
    2. db = client.novel_database
    3. collection = db.novels
    3.5 插入数据到MongoDB

    定义一个辅助函数,将数据插入到MongoDB:

    1. def insert_to_mongodb(title, novel_type, author, update_time, chapters):
    2. data = {
    3. "title": title,
    4. "novel_type": novel_type,
    5. "author": author,
    6. "update_time": update_time,
    7. "zhangjie": chapters
    8. }
    9. collection.insert_one(data)
    10. print(f"插入 {len(chapters)} 章成功:{title}")
    3.6 爬取章节内容

    定义爬取章节内容的函数:

    1. def neirong(ur, url, s, retries=3):
    2. while retries > 0:
    3. try:
    4. reps = requests.get(rf'{ur}{url}', headers=headers)
    5. reps.raise_for_status()
    6. html = etree.HTML(reps.text)
    7. if html is None:
    8. print(f"解析 HTML 内容错误,URL: {ur}/{url}")
    9. return
    10. chapter = html.xpath('//*[@id="content"]/h1/text()')
    11. if not chapter:
    12. print(f"未找到章节标题,URL: {ur}/{url}")
    13. return
    14. chapter = chapter[0].strip()
    15. text = html.xpath('//*[@id="htmlContent"]/text()')
    16. if not text:
    17. print(f"未找到章节内容,URL: {ur}/{url}")
    18. return
    19. text = ''.join(text[1:]) # 连接文本内容
    20. zhangjie_content.append({"chapter": chapter, "text": text})
    21. return
    22. except requests.RequestException as e:
    23. print(f"请求错误,URL: {ur}{url}, 错误: {e}")
    24. retries -= 1
    25. time.sleep(1) # 等待一段时间后重试
    26. print(f"重试次数过多,放弃 URL: {ur}{url}")
    3.7 爬取章节列表

    定义爬取章节列表的函数:

    1. def zhangjie(url, retries=3):
    2. while retries > 0:
    3. try:
    4. reps = requests.get(url, headers=headers, timeout=10)
    5. reps.raise_for_status()
    6. html = etree.HTML(reps.text)
    7. if html is None:
    8. print(f"解析 HTML 内容错误,URL: {url}")
    9. return
    10. title = html.xpath('//*[@id="info"]/h1/text()')
    11. title = title[0].strip() if title else "未知书名"
    12. novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
    13. novel_type = novel_type[0].strip() if novel_type else "未知类型"
    14. author = html.xpath('//*[@id="info"]/p[1]/a/text()')
    15. author = author[0].strip() if author else "未知作者"
    16. update_time = html.xpath('//*[@id="info"]/p[3]/text()')
    17. update_time = update_time[0].strip() if update_time else "未知时间"
    18. option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
    19. if not option_texts:
    20. print(f"未找到页码信息,URL: {url}")
    21. return
    22. zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
    23. if not zhang:
    24. print(f"未找到页码匹配,URL: {url}")
    25. return
    26. zhang = int(zhang[0])
    27. print('开始爬取:', title)
    28. s = 0 # 设置爬取多少章
    29. for i in range(1, zhang + 1):
    30. if s >= 100:
    31. break # 已经爬取100章,跳出循环
    32. zhangjie_url = f'{url}/index_{i}.html'
    33. zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
    34. zhangjie_reps.raise_for_status()
    35. zhangjie_html = etree.HTML(zhangjie_reps.text)
    36. if zhangjie_html is None:
    37. print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
    38. break
    39. zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
    40. if not zhangjieLis:
    41. print(f"未找到章节列表,URL: {zhangjie_url}")
    42. break
    43. threads = []
    44. for j in zhangjieLis:
    45. if s >= 100:
    46. break # 已经爬取100章,跳出循环
    47. thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
    48. threads.append(thread)
    49. thread.start()
    50. time.sleep(0.1)
    51. s += 1 # 统计章节数目
    52. for thread in threads:
    53. thread.join()
    54. # 插入所有爬取的章节内容到MongoDB
    55. insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
    56. zhangjie_content.clear() # 清空章节内容列表
    57. print(f"已成功记录数据:{title}")
    58. return
    59. except requests.RequestException as e:
    60. print(f"请求错误,URL: {url}, 错误: {e}")
    61. retries -= 1
    62. time.sleep(1) # 等待一段时间后重试
    63. print(f"重试次数过多,放弃 URL: {url}")
    3.8 使用信号量控制并发

    定义一个辅助函数,使用信号量控制并发数量:

    1. def crawl_with_semaphore(target, *args):
    2. with semaphore: # 使用信号量来控制并发数量
    3. target(*args)
    3.9 主函数

    定义主函数,从主页爬取小说列表并调用爬取章节的函数:

    1. def main(i):
    2. main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
    3. try:
    4. reps = requests.get(main_url, headers=headers, timeout=10)
    5. reps.raise_for_status()
    6. html = etree.HTML(reps.text)
    7. if html is None:
    8. print("解析 HTML 内容错误,主页 URL")
    9. return
    10. novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
    11. novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
    12. novel.text and 'href' in novel.attrib]
    13. for i in novel_urls:
    14. global lis
    15. lis = []
    16. zhangjie(i['url'])
    17. except requests.RequestException as e:
    18. print(f"请求错误,URL: {main_url}, 错误: {e}")
    3.10 程序入口

    定义程序入口,并调用主函数:

    1. if __name__ == "__main__":
    2. for i in range(1, 51): # 1,51是从第一页爬到第五十页
    3. main(i)

    4. 总结

    通过本文的示例,我们展示了如何使用Python爬取小说网站的章节内容并将其存储到Mongo

    5.完整代码

    1. import requests
    2. from lxml import etree
    3. import re
    4. import threading
    5. import time
    6. from pymongo import MongoClient
    7. # 设置请求头部,防止被网站封禁
    8. headers = {
    9. 'User-Agent': '改成你自己的'
    10. }
    11. # 全局变量和信号量用于控制并发和存储章节内容
    12. zhangjie_content = [] # 存储章节内容
    13. semaphore = threading.Semaphore(20) # 限制并发数量为20
    14. # MongoDB连接设置
    15. client = MongoClient('mongodb://localhost:27017/')
    16. db = client.novel_database
    17. collection = db.novels
    18. def insert_to_mongodb(title, novel_type, author, update_time, chapters):
    19. """
    20. 辅助函数,用于将数据插入到MongoDB中
    21. """
    22. data = {
    23. "title": title,
    24. "novel_type": novel_type,
    25. "author": author,
    26. "update_time": update_time,
    27. "zhangjie": chapters
    28. }
    29. collection.insert_one(data)
    30. print(f"插入 {len(chapters)} 章成功:{title}")
    31. def neirong(ur, url, s, retries=3):
    32. while retries > 0:
    33. try:
    34. reps = requests.get(rf'{ur}{url}', headers=headers)
    35. reps.raise_for_status()
    36. html = etree.HTML(reps.text)
    37. if html is None:
    38. print(f"解析 HTML 内容错误,URL: {ur}/{url}")
    39. return
    40. chapter = html.xpath('//*[@id="content"]/h1/text()')
    41. if not chapter:
    42. print(f"未找到章节标题,URL: {ur}/{url}")
    43. return
    44. chapter = chapter[0].strip()
    45. text = html.xpath('//*[@id="htmlContent"]/text()')
    46. if not text:
    47. print(f"未找到章节内容,URL: {ur}/{url}")
    48. return
    49. text = ''.join(text[1:]) # 连接文本内容
    50. zhangjie_content.append({"chapter": chapter, "text": text})
    51. return
    52. except requests.RequestException as e:
    53. print(f"请求错误,URL: {ur}{url}, 错误: {e}")
    54. retries -= 1
    55. time.sleep(1) # 等待一段时间后重试
    56. print(f"重试次数过多,放弃 URL: {ur}{url}")
    57. def zhangjie(url, retries=3):
    58. while retries > 0:
    59. try:
    60. reps = requests.get(url, headers=headers, timeout=10)
    61. reps.raise_for_status()
    62. html = etree.HTML(reps.text)
    63. if html is None:
    64. print(f"解析 HTML 内容错误,URL: {url}")
    65. return
    66. title = html.xpath('//*[@id="info"]/h1/text()')
    67. title = title[0].strip() if title else "未知书名"
    68. novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
    69. novel_type = novel_type[0].strip() if novel_type else "未知类型"
    70. author = html.xpath('//*[@id="info"]/p[1]/a/text()')
    71. author = author[0].strip() if author else "未知作者"
    72. update_time = html.xpath('//*[@id="info"]/p[3]/text()')
    73. update_time = update_time[0].strip() if update_time else "未知时间"
    74. option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
    75. if not option_texts:
    76. print(f"未找到页码信息,URL: {url}")
    77. return
    78. zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
    79. if not zhang:
    80. print(f"未找到页码匹配,URL: {url}")
    81. return
    82. zhang = int(zhang[0])
    83. print('开始爬取:', title)
    84. s = 0 # 设置爬取多少章
    85. for i in range(1, zhang + 1):
    86. if s >= 100:
    87. break # 已经爬取100章,跳出循环
    88. zhangjie_url = f'{url}/index_{i}.html'
    89. zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
    90. zhangjie_reps.raise_for_status()
    91. zhangjie_html = etree.HTML(zhangjie_reps.text)
    92. if zhangjie_html is None:
    93. print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
    94. break
    95. zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
    96. if not zhangjieLis:
    97. print(f"未找到章节列表,URL: {zhangjie_url}")
    98. break
    99. threads = []
    100. for j in zhangjieLis:
    101. if s >= 100:
    102. break # 已经爬取100章,跳出循环
    103. thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
    104. threads.append(thread)
    105. thread.start()
    106. time.sleep(0.1)
    107. s += 1 # 统计章节数目
    108. for thread in threads:
    109. thread.join()
    110. # 插入所有爬取的章节内容到MongoDB
    111. insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
    112. zhangjie_content.clear() # 清空章节内容列表
    113. print(f"已成功记录数据:{title}")
    114. return
    115. except requests.RequestException as e:
    116. print(f"请求错误,URL: {url}, 错误: {e}")
    117. retries -= 1
    118. time.sleep(1) # 等待一段时间后重试
    119. print(f"重试次数过多,放弃 URL: {url}")
    120. def crawl_with_semaphore(target, *args):
    121. with semaphore: # 使用信号量来控制并发数量
    122. target(*args)
    123. # 主函数
    124. def main(i):
    125. main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
    126. try:
    127. reps = requests.get(main_url, headers=headers, timeout=10)
    128. reps.raise_for_status()
    129. html = etree.HTML(reps.text)
    130. if html is None:
    131. print("解析 HTML 内容错误,主页 URL")
    132. return
    133. novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
    134. novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
    135. novel.text and 'href' in novel.attrib]
    136. for i in novel_urls:
    137. global lis
    138. lis = []
    139. zhangjie(i['url'])
    140. except requests.RequestException as e:
    141. print(f"请求错误,URL: {main_url}, 错误: {e}")
    142. if __name__ == "__main__":
    143. for i in range(1, 51):
    144. main(i)

    5.2.升级版

    1. 使用异步 I/O
      使用异步库如 aiohttp 和 asyncio 可以显著提高网络请求的效率。

    2. 减少等待时间
      减少每次请求之间的等待时间。

    3. 使用连接池
      使用连接池可以重用连接,减少建立新连接的开销。

    说明:

    1. 改用了 aiohttp 和 asyncio,以实现异步 I/O 操作。
    2. 使用信号量(semaphore)仍然控制并发数量,以避免过多请求导致的封禁。
    3. 将 fetch 函数封装请求逻辑,并且所有的网络请求都使用这个函数。
    4. 采用 asyncio.gather 同时处理多个任务。

    这样,可以显著提升爬取速度,并且享受到异步 I/O 带来的性能提升。

    1. import aiohttp
    2. import asyncio
    3. from lxml import etree
    4. import re
    5. from pymongo import MongoClient
    6. # 全局变量和信号量用于控制并发和存储章节内容
    7. zhangjie_content = [] # 存储章节内容
    8. semaphore = asyncio.Semaphore(50) # 限制并发数量为50
    9. # MongoDB连接设置
    10. client = MongoClient('mongodb://localhost:27017/')
    11. db = client.novel_database
    12. collection = db.novels
    13. def insert_to_mongodb(title, novel_type, author, update_time, chapters, img_url, jianjie):
    14. """
    15. 辅助函数,用于将数据插入到MongoDB中
    16. """
    17. data = {
    18. "title": title,
    19. "novel_type": novel_type,
    20. "author": author,
    21. "update_time": update_time,
    22. "zhangjie": chapters,
    23. 'img_url': img_url,
    24. 'jianjie': jianjie
    25. }
    26. collection.insert_one(data)
    27. print(f"插入 {len(chapters)} 章成功:{title}")
    28. async def fetch(session, url):
    29. async with semaphore: # 使用信号量来控制并发数量
    30. try:
    31. async with session.get(url) as response:
    32. return await response.text()
    33. except Exception as e:
    34. print(f"请求错误,URL: {url}, 错误: {e}")
    35. async def neirong(session, base_url, url):
    36. try:
    37. html_str = await fetch(session, f'{base_url}{url}')
    38. html = etree.HTML(html_str)
    39. if html is None:
    40. print(f"解析 HTML 内容错误,URL: {base_url}{url}")
    41. return
    42. chapter = html.xpath('//*[@id="content"]/h1/text()')
    43. if not chapter:
    44. print(f"未找到章节标题,URL: {base_url}{url}")
    45. return
    46. chapter = chapter[0].strip()
    47. text = html.xpath('//*[@id="htmlContent"]/text()')
    48. if not text:
    49. print(f"未找到章节内容,URL: {base_url}{url}")
    50. return
    51. text = ''.join(text[1:]) # 连接文本内容
    52. zhangjie_content.append({"chapter": chapter, "text": text})
    53. except Exception as e:
    54. print(f"处理章节内容错误,URL: {base_url}{url}, 错误: {e}")
    55. async def zhangjie(session, url):
    56. try:
    57. html_str = await fetch(session, url)
    58. html = etree.HTML(html_str)
    59. if html is None:
    60. print(f"解析 HTML 内容错误,URL: {url}")
    61. return
    62. title = html.xpath('//*[@id="info"]/h1/text()')
    63. title = title[0].strip() if title else "未知书名"
    64. novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
    65. novel_type = novel_type[0].strip() if novel_type else "未知类型"
    66. author = html.xpath('//*[@id="info"]/p[1]/a/text()')
    67. author = author[0].strip() if author else "未知作者"
    68. update_time = html.xpath('//*[@id="info"]/p[3]/text()')
    69. update_time = update_time[0].strip() if update_time else "未知时间"
    70. img_url = html.xpath('//*[@id="fmimg"]/img/@src')
    71. img_url = img_url[0].strip() if img_url else "未知图片"
    72. jianjie = ''.join(html.xpath('//*[@id="intro"]//text()')).strip() if html.xpath(
    73. '//*[@id="intro"]//text()') else "未知简介"
    74. option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
    75. if not option_texts:
    76. print(f"未找到页码信息,URL: {url}")
    77. return
    78. zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
    79. if not zhang:
    80. print(f"未找到页码匹配,URL: {url}")
    81. return
    82. zhang = int(zhang[0])
    83. print('开始爬取:', title)
    84. for i in range(1, zhang + 1):
    85. if len(zhangjie_content) >= 100:
    86. break # 已经爬取100章,跳出循环
    87. zhangjie_url = f'{url}/index_{i}.html'
    88. zhangjie_html_str = await fetch(session, zhangjie_url)
    89. zhangjie_html = etree.HTML(zhangjie_html_str)
    90. if zhangjie_html is None:
    91. print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
    92. break
    93. zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
    94. if not zhangjieLis:
    95. print(f"未找到章节列表,URL: {zhangjie_url}")
    96. break
    97. tasks = []
    98. for j in zhangjieLis:
    99. if len(zhangjie_content) >= 100:
    100. break # 已经爬取100章,跳出循环
    101. task = asyncio.create_task(neirong(session, url, j))
    102. tasks.append(task)
    103. await asyncio.gather(*tasks)
    104. # 插入所有爬取的章节内容到MongoDB
    105. insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content), img_url, jianjie)
    106. zhangjie_content.clear() # 清空章节内容列表
    107. print(f"已成功记录数据:{title}")
    108. except Exception as e:
    109. print(f"处理章节信息错误,URL: {url}, 错误: {e}")
    110. async def main():
    111. async with aiohttp.ClientSession() as session:
    112. tasks = []
    113. for i in range(1, 51):
    114. main_url = f'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
    115. task = asyncio.create_task(process_main_page(session, main_url))
    116. tasks.append(task)
    117. await asyncio.gather(*tasks)
    118. async def process_main_page(session, main_url):
    119. try:
    120. html_str = await fetch(session, main_url)
    121. html = etree.HTML(html_str)
    122. if html is None:
    123. print("解析 HTML 内容错误,主页 URL")
    124. return
    125. novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
    126. novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
    127. novel.text and 'href' in novel.attrib]
    128. tasks = []
    129. for novel in novel_urls:
    130. task = asyncio.create_task(zhangjie(session, novel['url']))
    131. tasks.append(task)
    132. await asyncio.gather(*tasks)
    133. except Exception as e:
    134. print(f"处理主页面错误,URL: {main_url}, 错误: {e}")
    135. if __name__ == "__main__":
    136. asyncio.run(main())

  • 相关阅读:
    Docker 常用命令
    Mysql加锁流程详解
    技术分享 | MySQL Shell 运行 SQL 的两种内置方法概述
    【JavaWeb笔记】Servlet入门—获取参数
    Cadence OrCAD Capture 设置线相对延迟约束的方法
    kubernetespod控制器详解2与service详解1
    LrC 13 & ACR 16:镜头模糊
    JUnit进行单元测试
    [ICCV-23] DeformToon3D: Deformable Neural Radiance Fields for 3D Toonification
    六.schema设计
  • 原文地址:https://blog.csdn.net/iku_n/article/details/139509931