在本篇文章中,我们将展示如何使用Python爬取小说网站的章节内容并将其存储到MongoDB数据库中。我们会使用requests进行网页请求,lxml处理HTML,re进行正则表达式匹配,threading实现并发处理,以及pymongo连接MongoDB。
真实情况是写课设没数据,获取取一点
请确保你的环境已经安装了以下依赖包:
pip install requests lxml pymongo
下面是完整的代码实现。我们将按照功能模块进行解释。
首先,我们需要引入所需的库:
- import requests
- from lxml import etree
- import re
- import threading
- import time
- from pymongo import MongoClient
为了防止被网站封禁,我们设置了请求头:
- headers = {
- 'User-Agent': '写你自己的'
- }
为了控制并发和存储章节内容,我们设置了全局变量和信号量:
- zhangjie_content = [] # 存储章节内容
- semaphore = threading.Semaphore(20) # 限制并发数量为20
连接到MongoDB数据库
- client = MongoClient('mongodb://localhost:27017/')
- db = client.novel_database
- collection = db.novels
定义一个辅助函数,将数据插入到MongoDB:
- def insert_to_mongodb(title, novel_type, author, update_time, chapters):
- data = {
- "title": title,
- "novel_type": novel_type,
- "author": author,
- "update_time": update_time,
- "zhangjie": chapters
- }
- collection.insert_one(data)
- print(f"插入 {len(chapters)} 章成功:{title}")
定义爬取章节内容的函数:
- def neirong(ur, url, s, retries=3):
- while retries > 0:
- try:
- reps = requests.get(rf'{ur}{url}', headers=headers)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {ur}/{url}")
- return
-
- chapter = html.xpath('//*[@id="content"]/h1/text()')
- if not chapter:
- print(f"未找到章节标题,URL: {ur}/{url}")
- return
-
- chapter = chapter[0].strip()
- text = html.xpath('//*[@id="htmlContent"]/text()')
- if not text:
- print(f"未找到章节内容,URL: {ur}/{url}")
- return
-
- text = ''.join(text[1:]) # 连接文本内容
- zhangjie_content.append({"chapter": chapter, "text": text})
-
- return
- except requests.RequestException as e:
- print(f"请求错误,URL: {ur}{url}, 错误: {e}")
- retries -= 1
- time.sleep(1) # 等待一段时间后重试
- print(f"重试次数过多,放弃 URL: {ur}{url}")
定义爬取章节列表的函数:
- def zhangjie(url, retries=3):
- while retries > 0:
- try:
- reps = requests.get(url, headers=headers, timeout=10)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {url}")
- return
-
- title = html.xpath('//*[@id="info"]/h1/text()')
- title = title[0].strip() if title else "未知书名"
- novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
- novel_type = novel_type[0].strip() if novel_type else "未知类型"
- author = html.xpath('//*[@id="info"]/p[1]/a/text()')
- author = author[0].strip() if author else "未知作者"
- update_time = html.xpath('//*[@id="info"]/p[3]/text()')
- update_time = update_time[0].strip() if update_time else "未知时间"
-
- option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
- if not option_texts:
- print(f"未找到页码信息,URL: {url}")
- return
-
- zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
- if not zhang:
- print(f"未找到页码匹配,URL: {url}")
- return
- zhang = int(zhang[0])
- print('开始爬取:', title)
-
- s = 0 # 设置爬取多少章
- for i in range(1, zhang + 1):
- if s >= 100:
- break # 已经爬取100章,跳出循环
-
- zhangjie_url = f'{url}/index_{i}.html'
- zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
- zhangjie_reps.raise_for_status()
- zhangjie_html = etree.HTML(zhangjie_reps.text)
- if zhangjie_html is None:
- print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
- break
-
- zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
- if not zhangjieLis:
- print(f"未找到章节列表,URL: {zhangjie_url}")
- break
-
- threads = []
-
- for j in zhangjieLis:
- if s >= 100:
- break # 已经爬取100章,跳出循环
- thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
- threads.append(thread)
- thread.start()
- time.sleep(0.1)
- s += 1 # 统计章节数目
-
- for thread in threads:
- thread.join()
-
- # 插入所有爬取的章节内容到MongoDB
- insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
- zhangjie_content.clear() # 清空章节内容列表
-
- print(f"已成功记录数据:{title}")
-
- return
- except requests.RequestException as e:
- print(f"请求错误,URL: {url}, 错误: {e}")
- retries -= 1
- time.sleep(1) # 等待一段时间后重试
- print(f"重试次数过多,放弃 URL: {url}")
定义一个辅助函数,使用信号量控制并发数量:
- def crawl_with_semaphore(target, *args):
- with semaphore: # 使用信号量来控制并发数量
- target(*args)
定义主函数,从主页爬取小说列表并调用爬取章节的函数:
- def main(i):
- main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
- try:
- reps = requests.get(main_url, headers=headers, timeout=10)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print("解析 HTML 内容错误,主页 URL")
- return
-
- novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
- novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
- novel.text and 'href' in novel.attrib]
- for i in novel_urls:
- global lis
- lis = []
- zhangjie(i['url'])
-
- except requests.RequestException as e:
- print(f"请求错误,URL: {main_url}, 错误: {e}")
定义程序入口,并调用主函数:
- if __name__ == "__main__":
- for i in range(1, 51): # 1,51是从第一页爬到第五十页
- main(i)
通过本文的示例,我们展示了如何使用Python爬取小说网站的章节内容并将其存储到Mongo
- import requests
- from lxml import etree
- import re
- import threading
- import time
- from pymongo import MongoClient
-
- # 设置请求头部,防止被网站封禁
- headers = {
- 'User-Agent': '改成你自己的'
- }
-
- # 全局变量和信号量用于控制并发和存储章节内容
- zhangjie_content = [] # 存储章节内容
- semaphore = threading.Semaphore(20) # 限制并发数量为20
-
- # MongoDB连接设置
- client = MongoClient('mongodb://localhost:27017/')
- db = client.novel_database
- collection = db.novels
-
- def insert_to_mongodb(title, novel_type, author, update_time, chapters):
- """
- 辅助函数,用于将数据插入到MongoDB中
- """
- data = {
- "title": title,
- "novel_type": novel_type,
- "author": author,
- "update_time": update_time,
- "zhangjie": chapters
- }
- collection.insert_one(data)
- print(f"插入 {len(chapters)} 章成功:{title}")
-
- def neirong(ur, url, s, retries=3):
- while retries > 0:
- try:
- reps = requests.get(rf'{ur}{url}', headers=headers)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {ur}/{url}")
- return
-
- chapter = html.xpath('//*[@id="content"]/h1/text()')
- if not chapter:
- print(f"未找到章节标题,URL: {ur}/{url}")
- return
-
- chapter = chapter[0].strip()
- text = html.xpath('//*[@id="htmlContent"]/text()')
- if not text:
- print(f"未找到章节内容,URL: {ur}/{url}")
- return
-
- text = ''.join(text[1:]) # 连接文本内容
- zhangjie_content.append({"chapter": chapter, "text": text})
-
- return
- except requests.RequestException as e:
- print(f"请求错误,URL: {ur}{url}, 错误: {e}")
- retries -= 1
- time.sleep(1) # 等待一段时间后重试
- print(f"重试次数过多,放弃 URL: {ur}{url}")
-
-
- def zhangjie(url, retries=3):
- while retries > 0:
- try:
- reps = requests.get(url, headers=headers, timeout=10)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {url}")
- return
-
- title = html.xpath('//*[@id="info"]/h1/text()')
- title = title[0].strip() if title else "未知书名"
- novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
- novel_type = novel_type[0].strip() if novel_type else "未知类型"
- author = html.xpath('//*[@id="info"]/p[1]/a/text()')
- author = author[0].strip() if author else "未知作者"
- update_time = html.xpath('//*[@id="info"]/p[3]/text()')
- update_time = update_time[0].strip() if update_time else "未知时间"
-
- option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
- if not option_texts:
- print(f"未找到页码信息,URL: {url}")
- return
-
- zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
- if not zhang:
- print(f"未找到页码匹配,URL: {url}")
- return
- zhang = int(zhang[0])
- print('开始爬取:', title)
-
- s = 0 # 设置爬取多少章
- for i in range(1, zhang + 1):
- if s >= 100:
- break # 已经爬取100章,跳出循环
-
- zhangjie_url = f'{url}/index_{i}.html'
- zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
- zhangjie_reps.raise_for_status()
- zhangjie_html = etree.HTML(zhangjie_reps.text)
- if zhangjie_html is None:
- print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
- break
-
- zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
- if not zhangjieLis:
- print(f"未找到章节列表,URL: {zhangjie_url}")
- break
-
- threads = []
-
- for j in zhangjieLis:
- if s >= 100:
- break # 已经爬取100章,跳出循环
- thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
- threads.append(thread)
- thread.start()
- time.sleep(0.1)
- s += 1 # 统计章节数目
-
- for thread in threads:
- thread.join()
-
- # 插入所有爬取的章节内容到MongoDB
- insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
- zhangjie_content.clear() # 清空章节内容列表
-
- print(f"已成功记录数据:{title}")
-
- return
- except requests.RequestException as e:
- print(f"请求错误,URL: {url}, 错误: {e}")
- retries -= 1
- time.sleep(1) # 等待一段时间后重试
- print(f"重试次数过多,放弃 URL: {url}")
-
- def crawl_with_semaphore(target, *args):
- with semaphore: # 使用信号量来控制并发数量
- target(*args)
-
-
-
- # 主函数
- def main(i):
- main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
- try:
- reps = requests.get(main_url, headers=headers, timeout=10)
- reps.raise_for_status()
- html = etree.HTML(reps.text)
- if html is None:
- print("解析 HTML 内容错误,主页 URL")
- return
-
- novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
- novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
- novel.text and 'href' in novel.attrib]
- for i in novel_urls:
- global lis
- lis = []
- zhangjie(i['url'])
-
- except requests.RequestException as e:
- print(f"请求错误,URL: {main_url}, 错误: {e}")
-
- if __name__ == "__main__":
- for i in range(1, 51):
- main(i)
使用异步 I/O:
使用异步库如 aiohttp 和 asyncio 可以显著提高网络请求的效率。
减少等待时间:
减少每次请求之间的等待时间。
使用连接池:
使用连接池可以重用连接,减少建立新连接的开销。
aiohttp 和 asyncio,以实现异步 I/O 操作。semaphore)仍然控制并发数量,以避免过多请求导致的封禁。fetch 函数封装请求逻辑,并且所有的网络请求都使用这个函数。asyncio.gather 同时处理多个任务。这样,可以显著提升爬取速度,并且享受到异步 I/O 带来的性能提升。
- import aiohttp
- import asyncio
- from lxml import etree
- import re
- from pymongo import MongoClient
-
- # 全局变量和信号量用于控制并发和存储章节内容
- zhangjie_content = [] # 存储章节内容
- semaphore = asyncio.Semaphore(50) # 限制并发数量为50
-
- # MongoDB连接设置
- client = MongoClient('mongodb://localhost:27017/')
- db = client.novel_database
- collection = db.novels
-
-
- def insert_to_mongodb(title, novel_type, author, update_time, chapters, img_url, jianjie):
- """
- 辅助函数,用于将数据插入到MongoDB中
- """
- data = {
- "title": title,
- "novel_type": novel_type,
- "author": author,
- "update_time": update_time,
- "zhangjie": chapters,
- 'img_url': img_url,
- 'jianjie': jianjie
- }
- collection.insert_one(data)
- print(f"插入 {len(chapters)} 章成功:{title}")
-
-
- async def fetch(session, url):
- async with semaphore: # 使用信号量来控制并发数量
- try:
- async with session.get(url) as response:
- return await response.text()
- except Exception as e:
- print(f"请求错误,URL: {url}, 错误: {e}")
-
-
- async def neirong(session, base_url, url):
- try:
- html_str = await fetch(session, f'{base_url}{url}')
- html = etree.HTML(html_str)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {base_url}{url}")
- return
-
- chapter = html.xpath('//*[@id="content"]/h1/text()')
- if not chapter:
- print(f"未找到章节标题,URL: {base_url}{url}")
- return
-
- chapter = chapter[0].strip()
- text = html.xpath('//*[@id="htmlContent"]/text()')
- if not text:
- print(f"未找到章节内容,URL: {base_url}{url}")
- return
-
- text = ''.join(text[1:]) # 连接文本内容
- zhangjie_content.append({"chapter": chapter, "text": text})
-
- except Exception as e:
- print(f"处理章节内容错误,URL: {base_url}{url}, 错误: {e}")
-
-
- async def zhangjie(session, url):
- try:
- html_str = await fetch(session, url)
- html = etree.HTML(html_str)
- if html is None:
- print(f"解析 HTML 内容错误,URL: {url}")
- return
-
- title = html.xpath('//*[@id="info"]/h1/text()')
- title = title[0].strip() if title else "未知书名"
- novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
- novel_type = novel_type[0].strip() if novel_type else "未知类型"
- author = html.xpath('//*[@id="info"]/p[1]/a/text()')
- author = author[0].strip() if author else "未知作者"
- update_time = html.xpath('//*[@id="info"]/p[3]/text()')
- update_time = update_time[0].strip() if update_time else "未知时间"
- img_url = html.xpath('//*[@id="fmimg"]/img/@src')
- img_url = img_url[0].strip() if img_url else "未知图片"
- jianjie = ''.join(html.xpath('//*[@id="intro"]//text()')).strip() if html.xpath(
- '//*[@id="intro"]//text()') else "未知简介"
-
- option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
- if not option_texts:
- print(f"未找到页码信息,URL: {url}")
- return
-
- zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
- if not zhang:
- print(f"未找到页码匹配,URL: {url}")
- return
- zhang = int(zhang[0])
- print('开始爬取:', title)
-
- for i in range(1, zhang + 1):
- if len(zhangjie_content) >= 100:
- break # 已经爬取100章,跳出循环
-
- zhangjie_url = f'{url}/index_{i}.html'
- zhangjie_html_str = await fetch(session, zhangjie_url)
- zhangjie_html = etree.HTML(zhangjie_html_str)
- if zhangjie_html is None:
- print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
- break
-
- zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
- if not zhangjieLis:
- print(f"未找到章节列表,URL: {zhangjie_url}")
- break
-
- tasks = []
- for j in zhangjieLis:
- if len(zhangjie_content) >= 100:
- break # 已经爬取100章,跳出循环
- task = asyncio.create_task(neirong(session, url, j))
- tasks.append(task)
-
- await asyncio.gather(*tasks)
-
- # 插入所有爬取的章节内容到MongoDB
- insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content), img_url, jianjie)
- zhangjie_content.clear() # 清空章节内容列表
-
- print(f"已成功记录数据:{title}")
-
- except Exception as e:
- print(f"处理章节信息错误,URL: {url}, 错误: {e}")
-
-
- async def main():
- async with aiohttp.ClientSession() as session:
- tasks = []
- for i in range(1, 51):
- main_url = f'http://www.biqule.net/top/monthvisit/{i}.html' # 月访问榜主页链接
- task = asyncio.create_task(process_main_page(session, main_url))
- tasks.append(task)
- await asyncio.gather(*tasks)
-
-
- async def process_main_page(session, main_url):
- try:
- html_str = await fetch(session, main_url)
- html = etree.HTML(html_str)
- if html is None:
- print("解析 HTML 内容错误,主页 URL")
- return
-
- novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
- novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
- novel.text and 'href' in novel.attrib]
- tasks = []
- for novel in novel_urls:
- task = asyncio.create_task(zhangjie(session, novel['url']))
- tasks.append(task)
- await asyncio.gather(*tasks)
-
- except Exception as e:
- print(f"处理主页面错误,URL: {main_url}, 错误: {e}")
-
-
- if __name__ == "__main__":
- asyncio.run(main())