• Scrapy基本概念——Item Pipeline


    一、Item Pipeline介绍

    蜘蛛抓取的每一个Item都会被发送到Item Pipeline。根据ITEM_PIPELINES的优先级设置,不同的Item Pipeline依次处理每一个Item,最后可删除该Item不做处理,也可将该Item发送到下一个Item Pipeline。Item Pipeline的主要用途有:

    1、清洗数据

    2、验证数据(检查Item某些字段是否为空)

    3、数据查重

    4、存储数据

    二、Item Pipeline的方法

    1、process_item

    1. 语法:process_item(self, item, spider)
    2. 参数:
    3. item (item object) -- Item实例
    4. spider (Spider object) -- spider实例
    5. 用法:每个Item Pipeline都需要调用此方法,这个方法必须返回一个 Item (或任何继承类)对象, 或是抛出DropItem异常,被丢弃的Item将不会被之后的Item Pipeline所处理。

    2、open_spider

    1. 语法:open_spider(self, spider)
    2. 参数:spider (Spider object) -- spider实例
    3. 用法:当spider打开时调用此方法。

    3、close_spider

    1. 语法:close_spider(self, spider)
    2. 参数:spider (Spider object) -- spider实例
    3. 用法:当spider关闭时调用此方法。

    4、from_crawler

    1. 语法:from_crawler(cls, crawler)
    2. 参数:crawler (Crawler object) -- 使用此管道的爬虫程序
    3. 用法:调用此方法从爬虫程序Crawler生成实例。返回实例对象括号里面的参数,是会进入初始化方法__init__的

    三、Item Pipeline的示例

    1、验证数据——价格数据验证(删除Item中price字段为空的Item)

    1. from itemadapter import ItemAdapter
    2. from scrapy.exceptions import DropItem
    3. class PricePipeline:
    4. vat_factor = 1.15
    5. def process_item(self, item, spider):
    6. adapter = ItemAdapter(item)
    7. if adapter.get('price'):
    8. if adapter.get('price_excludes_vat'):
    9. adapter['price'] = adapter['price'] * self.vat_factor
    10. return item
    11. else:
    12. raise DropItem(f"Missing price in {item}")

    2、存储数据——将项目写入JSON文件

    1. import json
    2. from itemadapter import ItemAdapter
    3. class JsonWriterPipeline:
    4. def open_spider(self, spider):
    5. self.file = open('items.jl', 'w')
    6. def close_spider(self, spider):
    7. self.file.close()
    8. def process_item(self, item, spider):
    9. line = json.dumps(ItemAdapter(item).asdict()) + "\n"
    10. self.file.write(line)
    11. return item

    3、存储数据——将项目写入MongoDB

    1. import pymongo
    2. from itemadapter import ItemAdapter
    3. class MongoPipeline:
    4. collection_name = 'scrapy_items'
    5. def __init__(self, mongo_uri, mongo_db):
    6. self.mongo_uri = mongo_uri
    7. self.mongo_db = mongo_db
    8. @classmethod
    9. def from_crawler(cls, crawler):
    10. return cls(
    11. mongo_uri=crawler.settings.get('MONGO_URI'), #从settings读取配置
    12. mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') #从settings读取配置
    13. )
    14. def open_spider(self, spider):
    15. self.client = pymongo.MongoClient(self.mongo_uri)
    16. self.db = self.client[self.mongo_db]
    17. def close_spider(self, spider):
    18. self.client.close()
    19. def process_item(self, item, spider):
    20. self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
    21. return item

    4、存储数据——项目截图

    此Item Pipeline生成一个请求,向本地运行的Splash实例(一个javascript渲染服务器)发出,以渲染Item中URL的屏幕截图。Item Pipeline使用协同程序将请求响应下载后,将屏幕截图保存到文件中,并将文件名添加到Item中。

    1. import hashlib
    2. from urllib.parse import quote
    3. import scrapy
    4. from itemadapter import ItemAdapter
    5. from scrapy.utils.defer import maybe_deferred_to_future
    6. class ScreenshotPipeline:
    7. SPLASH_URL = "http://localhost:8050/render.png?url={}"
    8. async def process_item(self, item, spider):
    9. adapter = ItemAdapter(item)
    10. encoded_item_url = quote(adapter["url"])
    11. screenshot_url = self.SPLASH_URL.format(encoded_item_url)
    12. request = scrapy.Request(screenshot_url)
    13. response = await maybe_deferred_to_future(spider.crawler.engine.download(request, spider))
    14. if response.status != 200:
    15. # Error happened, return item.
    16. return item
    17. # Save screenshot to file, filename will be hash of url.
    18. url = adapter["url"]
    19. url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
    20. filename = f"{url_hash}.png"
    21. with open(filename, "wb") as f:
    22. f.write(response.body)
    23. # Store filename in item.
    24. adapter["screenshot_filename"] = filename
    25. return item

    5、数据查重——重复筛选器

    1. from itemadapter import ItemAdapter
    2. from scrapy.exceptions import DropItem
    3. class DuplicatesPipeline:
    4. def __init__(self):
    5. self.ids_seen = set()
    6. def process_item(self, item, spider):
    7. adapter = ItemAdapter(item)
    8. if adapter['id'] in self.ids_seen:
    9. raise DropItem(f"Duplicate item found: {item!r}")
    10. else:
    11. self.ids_seen.add(adapter['id'])
    12. return item

    四、Item Pipeline的激活

    将Item Pipeline的类添加到ITEM_PIPELINES设置,在此设置中分配给类的整数值决定了运行顺序,由低到高。如下:

    1. ITEM_PIPELINES = {
    2. 'myproject.pipelines.PricePipeline': 300,
    3. 'myproject.pipelines.JsonWriterPipeline': 800,
    4. }

    更多爬虫知识以及实例源码,可关注微信公众号:angry_it_man

  • 相关阅读:
    TcpServer::start都做了些什么
    【白话前端】和three.js功能相近的8个js库
    java计算机毕业设计酒店预约入住系统源码+mysql数据库+系统+lw文档+部署
    hive逗号分割行列转换
    python返回多个值与赋值多个值
    C#.Net筑基-基础知识
    Linux中getopt函数、optind等变量使用详解
    【Swift 60秒】46 - Inout parameters
    [PostgreSql]生产级别数据库安装要考虑哪些问题?
    1018 锤子剪刀布
  • 原文地址:https://blog.csdn.net/xuyuanfan77/article/details/128045053