• 爬网页不用写代码?什么操作


    实验环境

    • Python 3.9.12

    配置文件格式

    爬页面基本是先请求再解析然后再请求然后不断重复,页面结构相对固定的情况下,弄一种配置文件来描述爬取步骤,这样就不用写代码了,想要爬不同的页面只写配置不写代码,所以:

    
    <root name="#name" version="#version" cmd="#cmd">
        <spider value="https://pypi.org/" id="source" expires="3600"/>
        <selector source="#source" id="selector1" value="main ul.unstyled li a.package-snippet" attr="href"/>
        <spider source="#selector1" value="https://pypi.org/" id="package" expires="3600"/>
        <selector source="#package" id="source1" value=".banner"/>
        <selector source="#source1" name="package-name" value="h1.package-header__name" attr="innerHTML"/>
        <selector source="#source1" id="cmd" value="span#pip-command" attr="innerHTML"/>
        <regexp source="selector[name='package-name']" id="reg1" value="(\S*)\s+(\S*)"/>
        <index id="name" source="#reg1" value="0"/>
        <index id="version" source="#reg1" value="1"/>
    root>
    

    配置一共5种标签:

    • root:一个配置文件只有一个root标签,包裹在最外层,上面是结果列表元素的键-值映射;
    • spider:描述爬取什么网页,value属性表示路径前缀,expires描述爬取结果的时效性暂时还没用到
    • selector:接收xml字符串,value属性是一个css选择器,输出通过value查询到的元素的attr属性,attr的取值有:
      • innerHTML:表示取元素的内部文本段且不包含标签,
      • 不写:获取“outerHTML”,
      • 其它:对应的属性如attr="href"就取href属性;
    • regexp:正则,表达式一定要有分组(()),接受str,返回typing.Iterable
    • index:上面不是说正则处理结果不是str而是typing.Iterable吗,用次标签选择typing.Iterable里面的单个str元素。

    如上配置是一种简单的场景,是在爬取pypi首页下方的py包的名字、版本号和命令。
    然后呢,配置文件有了,接着考虑怎么解析配置文件。

    代码

    import json
    import re
    from typing import Iterable
    from bs4 import BeautifulSoup, Tag
    import requests
    
    
    def request(url: str, params: dict = None, json: dict = None) -> str:
        try:
            resp = requests.get(url, params=params, json=json, timeout=90)
            return resp.text if resp.status_code == 200 else ''  # 响应状态不成功就返回空串
        except BaseException as e:
            print(e)
            return ''
    
    
    def select(
        raw_xml: str, selector: str, attr: 'str|None' = None, single: bool = False
    ) -> 'str|list':
        if not raw_xml:
            return '' if single else []
        soup = BeautifulSoup(raw_xml, features="xml")
        if single:
            elements = [soup.select_one(selector), ]
        else:
            elements = soup.select(selector)
        ret = []
        for element in elements:
            if attr == 'innerHTML':
                res = element.text
            elif attr is None:
                res = str(element)  # 上述获取“outerHTML”
            elif attr in element.attrs:
                res = element.attrs[attr]
            else:
                res = ''
            ret.append(res)
        if single:
            return ret[0] if len(ret) > 0 else ''
        return ret
    
    
    def match(string: str, pattern: str) -> 'list[tuple[str]]':
        if not pattern:
            return [(string, ), ] if string else []
        return re.findall(pattern, string, re.M)
    
    
    def json_parser(json_str: str) -> 'list|dict':
        if not json_str:
            return {}
        return json.loads(json_str)
    
    
    def index(obj: 'list|dict', key: 'int|str'):
        if isinstance(obj, str):
            return ''
        if isinstance(obj, (list, tuple)):
            key = int(key)
            return obj[key] if key < len(obj) else ''
        return obj[key] if key in obj else ''  # 这么做其实是考虑上面的json_parser方法会返回dict,而实际上页面总是返回html
    
    
    class Node:
        def __init__(
            self,
            method: str,
            attrs: dict = None,
            previous=None,
            next_=None,
        ) -> None:
            self.method = method.lower()
            self.previous = previous
            self.next = next_
            self.results = None
            self.visit_times = 0
            self.attrs = attrs or {}
            if 'id' in self.attrs:
                self.id = (self.method, self.attrs['id'])
            else:
                entrys = sorted(self.attrs.items(), key=lambda item: item[0])
                self.id = (self.method, *entrys)
    
        def __eq__(self, __o: object) -> bool:
            if type(__o) is not Node:
                return False
            return hash(self.id) == hash(__o.id)
    
        def __hash__(self) -> int:
            return hash(self.id)
    
        def __str__(self):
            # 打印好看
            return str({
                'method': self.method,
                'attrs': self.attrs,
                'id': hash(self.id),
                'previous': [hash(item) for item in self.previous] if self.previous else None,
                'next':   [hash(item) for item in self.next] if self.next else None,
            })
    
        def __repr__(self) -> str:
            return str(self)
    
        def __call__(self):
            # 首先在对象中记录调用
            self.visit_times += 1
    
            # 统一输入类型
            if self.previous is None:
                previous = []
            elif isinstance(self.previous, Iterable):
                previous = self.previous
            else:
                previous = [self.previous, ]
    
            values = []
            for node in previous:
                if isinstance(node, str):
                    values.append(node)
                elif isinstance(node.results, str):
                    values.append(node.results)
                else:
                    values.extend(node.results)
    
            # 判断操作类型
            if self.method == 'spider':
                self.results = self.__request(values)
            elif self.method == 'selector':
                self.results = self.__select(values)
            elif self.method == 'json':
                self.results = self.__json(values)
            elif self.method == 'regexp':
                self.results = self.__reg(values)
            elif self.method == 'index':
                self.results = self.__index(values)
    
        def __index(self, values: list):
            i = self.attrs['value'] if 'value' in self.attrs else None
            ret = []
            for obj in values:
                value = index(obj, i)
                ret.append(value)
            return ret
    
        def __reg(self, values: list):
            regexp = self.attrs['value'] if 'value' in self.attrs else None
            ret = []
            for text in values:
                group = match(str(text).strip(), regexp)
                ret.extend(group)
            return ret
    
        def __json(self, values: list):
            ret = []
            for json_text in values:
                obj = json_parser(json_text)
                ret.append(obj)
            return ret
    
        def __select(self, values: list):
            selecotr = self.attrs['value'] if 'value' in self.attrs else None
            attr = self.attrs['attr'] if 'attr' in self.attrs else None
            single = self.attrs['single'] if 'single' in self.attrs else None
            ret = []
            for xml in values:
                texts = select(xml, selecotr, attr, single)
                if single:
                    ret.append(texts)
                else:
                    ret.extend(texts)
            return ret
    
        def __request(self, raw_urls: list):
            prefix = self.attrs['value'] if 'value' in self.attrs else ''
            if not raw_urls:
                urls = [prefix, ]
            else:
                urls = [prefix+url for url in raw_urls]
            ret = []
            for url in urls:
                resp = request(str(url))
                ret.append(resp)
            return ret
    
    
    def select_all(root: Tag) -> dict:
        elements = root.select('*')
        ret = {}
        for element in elements:
            node = Node(element.name, element.attrs)
            ret[hash(node)] = node
        return ret
    
    
    def find_output_nodes(root: Tag) -> dict:
        dictionary = {}
        for key, value in root.attrs.items():
            element = root.select_one(value)
            if element is None:
                continue
            tag_name = element.name
            node = Node(tag_name, element.attrs)
            dictionary[key] = node
        return dictionary
    
    
    def build(src: str):
        with open(src, 'r', encoding="utf8") as file:
            read = file.read()
        soup = BeautifulSoup(read, features="xml")
        root = soup.select_one('root')
        node_pool = select_all(root)
        dictionary = find_output_nodes(root)
    
        hold = list(dictionary.values())
        tops = []
        while hold:
            new_hold = []
            for node in hold:
                selector = node.attrs['source']if 'source' in node.attrs else None
                if not selector:
                    if node not in tops:
                        tops.append(node)
                    continue
                elements = root.select(selector)
                previous_list = []
                for element in elements:
                    previous = Node(element.name, element.attrs)
                    previous = node_pool.get(hash(previous))
                    if previous not in previous_list:
                        previous_list.append(previous)
    
                node.previous = previous_list
                for previous in previous_list:
                    if previous.next:
                        if node not in previous.next:
                            previous.next.append(node)
                    else:
                        previous.next = [node, ]
                new_hold.extend(previous_list)
            hold = set(new_hold)
        return tops, dictionary
    
    
    def run(node_list: 'list[Node]'):
        hold = node_list
        while hold:
            new_hold = []
            for node in hold:
                can_run = node.visit_times <= 0
                if node.previous and len(node.previous) > 1:
                    for previous in node.previous:
                        if previous.visit_times <= 0:
                            can_run = False
                            break
                if can_run:
                    node()
                    new_hold.extend(node.next or [])
                else:
                    new_hold.append(node)
            hold = []
            for node in new_hold:
                if node not in hold:
                    hold.append(node)
    
    
    def main(path):
        tops, dictionary = build(path)
        run(tops)
        length = 0
        for node in dictionary.values():
            if node.results and not length:
                length = len(node.results)
                continue
            if len(node.results) != length:
                return []
        ret = [{} for _ in range(length)]
        for key, node in dictionary.items():
            if not node.results:
                continue
            for i in range(length):
                ret[i][key] = node.results[i]
        return ret
    
    
    if __name__ == '__main__':
        result = main('./data.xml')
        print(result)
    

    运行结果

    [
        {
            "name": "jhashcode-python3",
            "version": "0.4",
            "cmd": "pip install jhashcode-python3"
        },
        {
            "name": "django-registration-bootstrap",
            "version": "0.2.0",
            "cmd": "pip install django-registration-bootstrap"
        },
        {
            "name": "talisker",
            "version": "0.21.1",
            "cmd": "pip install talisker"
        },
        {
            "name": "AgileP",
            "version": "1.0.0",
            "cmd": "pip install AgileP"
        },
        {
            "name": "odoo12-addon-project-role",
            "version": "12.0.1.1.1",
            "cmd": "pip install odoo12-addon-project-role"
        },
        {
            "name": "Remilia",
            "version": "2022.9.25.6.29.15",
            "cmd": "pip install Remilia"
        },
        {
            "name": "anime-api",
            "version": "0.13.1",
            "cmd": "pip install anime-api"
        },
        {
            "name": "remindmail",
            "version": "2022.9.25.2",
            "cmd": "pip install remindmail"
        },
        {
            "name": "typesense-orm",
            "version": "0.0.14",
            "cmd": "pip install typesense-orm"
        },
        {
            "name": "mariner-rrx",
            "version": "1.1.6",
            "cmd": "pip install mariner-rrx"
        }
    ]
    
  • 相关阅读:
    大数据数仓建模基础理论【维度表、事实表、数仓分层及示例】
    C++入门需要多久?
    MySQL安装到建库表实践全流程讲解(windows)
    AVL树左旋转,右旋转代码实现
    vue+element作用域插槽
    Linux操作系统第一讲
    【C++】初识类和对象
    七、安卓手机环境检测软件分享
    当try_files遇上gateway是如何产生火花的
    flutter升级+生成drift文件
  • 原文地址:https://blog.csdn.net/dscn15848078969/article/details/127037971