当使用Python编写一个基于队列的生产者消费者爬虫时,我们通常会使用threading
或multiprocessing
模块来处理并发,并使用queue
模块来管理数据队列。下面是一个详细的示例,该示例展示了如何使用生产者线程生成URL,消费者线程爬取这些URL的内容。
请注意,这里为了简化示例,我们将不会实际进行网页爬取,而是模拟这个过程。在实际应用中,我们可能需要使用如requests
库来发送HTTP请求,并使用如BeautifulSoup
或lxml
来解析HTML内容。
(1)安装必要的库(如果尚未安装)
- bash复制代码
-
- pip install requests beautifulsoup4
(2)示例代码
- import threading
- import queue
- import time
- import random
- from urllib.parse import urljoin
- from bs4 import BeautifulSoup # 导入BeautifulSoup,但在此示例中不会实际使用
- import requests # 导入requests,但在此示例中不会实际发送请求
-
- # 模拟的起始URL和要爬取的网站域名
- START_URL = 'http://example.com'
- BASE_DOMAIN = 'http://example.com'
-
- # 队列,用于在生产者和消费者之间传递URL
- url_queue = queue.Queue()
-
- # 生产者函数,生成并添加URL到队列中
- def producer(url_queue, num_urls):
- print('Producer started.')
- urls_seen = set()
- urls_to_add = [START_URL]
-
- while urls_to_add and num_urls > 0:
- current_url = urls_to_add.pop(0)
- if current_url not in urls_seen:
- urls_seen.add(current_url)
- url_queue.put(current_url)
- num_urls -= 1
-
- # 模拟从当前URL生成新的URL(这里只是简单地添加了一些随机路径)
- for _ in range(random.randint(1, 3)):
- new_path = f"/some/random/path/{random.randint(1, 100)}"
- new_url = urljoin(BASE_DOMAIN, new_path)
- urls_to_add.append(new_url)
-
- print('Producer finished generating', num_urls, 'URLs.')
-
- # 消费者函数,从队列中获取URL并“爬取”内容(模拟)
- def consumer(url_queue):
- print('Consumer started.')
- while not url_queue.empty():
- url = url_queue.get()
- print(f'Crawling {url}...')
-
- # 在这里,我们会使用requests发送HTTP请求,并使用BeautifulSoup解析内容
- # 但为了简化示例,我们只是模拟这个过程
- time.sleep(random.uniform(0.5, 1.5)) # 模拟网络延迟
- print(f'Crawled {url}. Content: (Simulated content)')
-
- # 标记URL为已处理(在实际应用中可能不需要)
- url_queue.task_done()
-
- print('Consumer finished.')
-
- # 创建并启动生产者线程
- producer_thread = threading.Thread(target=producer, args=(url_queue, 10)) # 生成10个URL作为示例
- producer_thread.start()
-
- # 创建并启动多个消费者线程
- num_consumers = 3
- consumer_threads = []
- for _ in range(num_consumers):
- consumer_thread = threading.Thread(target=consumer, args=(url_queue,))
- consumer_thread.start()
- consumer_threads.append(consumer_thread)
-
- # 等待所有消费者线程完成
- for t in consumer_threads:
- t.join()
-
- # 等待生产者线程完成(如果需要的话)
- producer_thread.join()
-
- # 当队列为空时,所有任务都已完成
- print('All tasks completed.')
这个示例展示了如何使用线程和队列来实现生产者消费者模式。生产者线程生成URL并将其添加到队列中,而消费者线程从队列中获取URL并模拟爬取过程。请注意,由于线程共享内存,因此我们需要小心处理对队列的访问,但Python的queue
模块是线程安全的,因此我们可以安全地在多个线程之间传递数据。