爬页面基本是先请求再解析然后再请求然后不断重复,页面结构相对固定的情况下,弄一种配置文件来描述爬取步骤,这样就不用写代码了,想要爬不同的页面只写配置不写代码,所以:
<root name="#name" version="#version" cmd="#cmd">
<spider value="https://pypi.org/" id="source" expires="3600"/>
<selector source="#source" id="selector1" value="main ul.unstyled li a.package-snippet" attr="href"/>
<spider source="#selector1" value="https://pypi.org/" id="package" expires="3600"/>
<selector source="#package" id="source1" value=".banner"/>
<selector source="#source1" name="package-name" value="h1.package-header__name" attr="innerHTML"/>
<selector source="#source1" id="cmd" value="span#pip-command" attr="innerHTML"/>
<regexp source="selector[name='package-name']" id="reg1" value="(\S*)\s+(\S*)"/>
<index id="name" source="#reg1" value="0"/>
<index id="version" source="#reg1" value="1"/>
root>
配置一共5种标签:
root
:一个配置文件只有一个root
标签,包裹在最外层,上面是结果列表元素的键-值映射;spider
:描述爬取什么网页,value
属性表示路径前缀,expires
描述爬取结果的时效性暂时还没用到selector
:接收xml字符串,value
属性是一个css选择器,输出通过value
查询到的元素的attr
属性,attr
的取值有:
innerHTML
:表示取元素的内部文本段且不包含标签,attr="href"
就取href
属性;regexp
:正则,表达式一定要有分组(()
),接受str
,返回typing.Iterable
;index
:上面不是说正则处理结果不是str
而是typing.Iterable
吗,用次标签选择typing.Iterable
里面的单个str
元素。如上配置是一种简单的场景,是在爬取pypi首页下方的py包的名字、版本号和命令。
然后呢,配置文件有了,接着考虑怎么解析配置文件。
import json
import re
from typing import Iterable
from bs4 import BeautifulSoup, Tag
import requests
def request(url: str, params: dict = None, json: dict = None) -> str:
try:
resp = requests.get(url, params=params, json=json, timeout=90)
return resp.text if resp.status_code == 200 else '' # 响应状态不成功就返回空串
except BaseException as e:
print(e)
return ''
def select(
raw_xml: str, selector: str, attr: 'str|None' = None, single: bool = False
) -> 'str|list':
if not raw_xml:
return '' if single else []
soup = BeautifulSoup(raw_xml, features="xml")
if single:
elements = [soup.select_one(selector), ]
else:
elements = soup.select(selector)
ret = []
for element in elements:
if attr == 'innerHTML':
res = element.text
elif attr is None:
res = str(element) # 上述获取“outerHTML”
elif attr in element.attrs:
res = element.attrs[attr]
else:
res = ''
ret.append(res)
if single:
return ret[0] if len(ret) > 0 else ''
return ret
def match(string: str, pattern: str) -> 'list[tuple[str]]':
if not pattern:
return [(string, ), ] if string else []
return re.findall(pattern, string, re.M)
def json_parser(json_str: str) -> 'list|dict':
if not json_str:
return {}
return json.loads(json_str)
def index(obj: 'list|dict', key: 'int|str'):
if isinstance(obj, str):
return ''
if isinstance(obj, (list, tuple)):
key = int(key)
return obj[key] if key < len(obj) else ''
return obj[key] if key in obj else '' # 这么做其实是考虑上面的json_parser方法会返回dict,而实际上页面总是返回html
class Node:
def __init__(
self,
method: str,
attrs: dict = None,
previous=None,
next_=None,
) -> None:
self.method = method.lower()
self.previous = previous
self.next = next_
self.results = None
self.visit_times = 0
self.attrs = attrs or {}
if 'id' in self.attrs:
self.id = (self.method, self.attrs['id'])
else:
entrys = sorted(self.attrs.items(), key=lambda item: item[0])
self.id = (self.method, *entrys)
def __eq__(self, __o: object) -> bool:
if type(__o) is not Node:
return False
return hash(self.id) == hash(__o.id)
def __hash__(self) -> int:
return hash(self.id)
def __str__(self):
# 打印好看
return str({
'method': self.method,
'attrs': self.attrs,
'id': hash(self.id),
'previous': [hash(item) for item in self.previous] if self.previous else None,
'next': [hash(item) for item in self.next] if self.next else None,
})
def __repr__(self) -> str:
return str(self)
def __call__(self):
# 首先在对象中记录调用
self.visit_times += 1
# 统一输入类型
if self.previous is None:
previous = []
elif isinstance(self.previous, Iterable):
previous = self.previous
else:
previous = [self.previous, ]
values = []
for node in previous:
if isinstance(node, str):
values.append(node)
elif isinstance(node.results, str):
values.append(node.results)
else:
values.extend(node.results)
# 判断操作类型
if self.method == 'spider':
self.results = self.__request(values)
elif self.method == 'selector':
self.results = self.__select(values)
elif self.method == 'json':
self.results = self.__json(values)
elif self.method == 'regexp':
self.results = self.__reg(values)
elif self.method == 'index':
self.results = self.__index(values)
def __index(self, values: list):
i = self.attrs['value'] if 'value' in self.attrs else None
ret = []
for obj in values:
value = index(obj, i)
ret.append(value)
return ret
def __reg(self, values: list):
regexp = self.attrs['value'] if 'value' in self.attrs else None
ret = []
for text in values:
group = match(str(text).strip(), regexp)
ret.extend(group)
return ret
def __json(self, values: list):
ret = []
for json_text in values:
obj = json_parser(json_text)
ret.append(obj)
return ret
def __select(self, values: list):
selecotr = self.attrs['value'] if 'value' in self.attrs else None
attr = self.attrs['attr'] if 'attr' in self.attrs else None
single = self.attrs['single'] if 'single' in self.attrs else None
ret = []
for xml in values:
texts = select(xml, selecotr, attr, single)
if single:
ret.append(texts)
else:
ret.extend(texts)
return ret
def __request(self, raw_urls: list):
prefix = self.attrs['value'] if 'value' in self.attrs else ''
if not raw_urls:
urls = [prefix, ]
else:
urls = [prefix+url for url in raw_urls]
ret = []
for url in urls:
resp = request(str(url))
ret.append(resp)
return ret
def select_all(root: Tag) -> dict:
elements = root.select('*')
ret = {}
for element in elements:
node = Node(element.name, element.attrs)
ret[hash(node)] = node
return ret
def find_output_nodes(root: Tag) -> dict:
dictionary = {}
for key, value in root.attrs.items():
element = root.select_one(value)
if element is None:
continue
tag_name = element.name
node = Node(tag_name, element.attrs)
dictionary[key] = node
return dictionary
def build(src: str):
with open(src, 'r', encoding="utf8") as file:
read = file.read()
soup = BeautifulSoup(read, features="xml")
root = soup.select_one('root')
node_pool = select_all(root)
dictionary = find_output_nodes(root)
hold = list(dictionary.values())
tops = []
while hold:
new_hold = []
for node in hold:
selector = node.attrs['source']if 'source' in node.attrs else None
if not selector:
if node not in tops:
tops.append(node)
continue
elements = root.select(selector)
previous_list = []
for element in elements:
previous = Node(element.name, element.attrs)
previous = node_pool.get(hash(previous))
if previous not in previous_list:
previous_list.append(previous)
node.previous = previous_list
for previous in previous_list:
if previous.next:
if node not in previous.next:
previous.next.append(node)
else:
previous.next = [node, ]
new_hold.extend(previous_list)
hold = set(new_hold)
return tops, dictionary
def run(node_list: 'list[Node]'):
hold = node_list
while hold:
new_hold = []
for node in hold:
can_run = node.visit_times <= 0
if node.previous and len(node.previous) > 1:
for previous in node.previous:
if previous.visit_times <= 0:
can_run = False
break
if can_run:
node()
new_hold.extend(node.next or [])
else:
new_hold.append(node)
hold = []
for node in new_hold:
if node not in hold:
hold.append(node)
def main(path):
tops, dictionary = build(path)
run(tops)
length = 0
for node in dictionary.values():
if node.results and not length:
length = len(node.results)
continue
if len(node.results) != length:
return []
ret = [{} for _ in range(length)]
for key, node in dictionary.items():
if not node.results:
continue
for i in range(length):
ret[i][key] = node.results[i]
return ret
if __name__ == '__main__':
result = main('./data.xml')
print(result)
[
{
"name": "jhashcode-python3",
"version": "0.4",
"cmd": "pip install jhashcode-python3"
},
{
"name": "django-registration-bootstrap",
"version": "0.2.0",
"cmd": "pip install django-registration-bootstrap"
},
{
"name": "talisker",
"version": "0.21.1",
"cmd": "pip install talisker"
},
{
"name": "AgileP",
"version": "1.0.0",
"cmd": "pip install AgileP"
},
{
"name": "odoo12-addon-project-role",
"version": "12.0.1.1.1",
"cmd": "pip install odoo12-addon-project-role"
},
{
"name": "Remilia",
"version": "2022.9.25.6.29.15",
"cmd": "pip install Remilia"
},
{
"name": "anime-api",
"version": "0.13.1",
"cmd": "pip install anime-api"
},
{
"name": "remindmail",
"version": "2022.9.25.2",
"cmd": "pip install remindmail"
},
{
"name": "typesense-orm",
"version": "0.0.14",
"cmd": "pip install typesense-orm"
},
{
"name": "mariner-rrx",
"version": "1.1.6",
"cmd": "pip install mariner-rrx"
}
]