App 爬取数据相对于 Web 端爬取更加容易,因为反爬虫能力没有那么强,而且大多数的数据都是以 JSON形式传输的,解析更加简单。在 Web 端,我们可以通过浏览器的开发者工具监听到各个网络请求和响应过程, 在 App 端如果想要查看这些内容就需要借助抓包软件。常用的抓包软件有 WireShark、Filddler、Charles、 mitmproxy、 AnyProxy 等,它们的原理基本是相同的。 我们可以通过设置代理的方式将手机处于抓包 软件的监昕之下,这样便可以看到 App 在运行过程中发生的所有请求和响应了,相当于分析 Ajax 一 样。 如果这些请求的 URL、参数等都是有规律的,那么总结出规律直接用程序模拟爬取即可,如果它 们没有规律,那么我们可以利用另一个工具 mitmdump 对接 Python 脚本直接处理 Response。 另外, App 的爬取肯定不能由人来完成,也需要做到自动化,所以我们还要对 App 进行向动化控制, 这里用 到的库是 Airtest。
夜神模拟器(adb.exe)与Airtest(nox_adb.exe)中的adb版本需要完全相同,复制其中一个到别外一个文件中保证adb版本完全相同
夜神模拟器的adb位置:…\bin
Airtest的adb位置:…\airtest\core\android\static\adb\windows
开启开发者选项,打开USB调试
在夜神模拟器中配置好代理和安装好mitmdump 证书,正常监听如下图
mitmweb
能过Airtest连接夜神模拟器成功如下图所示
我遇到的坑
无法找到夜神模拟器adb进行连接,可能是adb版同不一样
adb一样了还是找不到,可能是没有开启USB调试
USB调试出开还是找不到,那就重启 夜神模拟器 和 airtest
mitmdump.exe -s .\mitmproxy\gg.py
代码如下(示例):
# 启动命令
# mitmdump.exe -s .\mitmproxy\gg.py
import json
from mitmproxy import ctx
import pymongo
# pymongo有自带的连接池和自动重连机制,但是仍需要捕捉AutoReconnect异常并重新发起请求。
from pymongo.errors import AutoReconnect
from retry import retry
# 指定 mongodb 的连接IP,库名,集合
MONGO_CONNECTION_STRING = 'mongodb://192.168.27.101:27017'
client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
db = client['crawle_case']
collection = db['tsy_2']
'''
AutoReconnect:捕捉到该错误时进行重试,这个参数可以是一个元组,里面放上多个需要重试的条件
tries:重试次数
delay:两次重试的间隔时间
'''
@retry(AutoReconnect, tries=4, delay=1)
def save_data(data):
"""
将数据保存到 mongodb
使用 update_one() 方法修改文档中的记录。该方法第一个参数为查询的条件,第二个参数为要修改的字段。
upsert:
是一种特殊的更新,如果没有找到符合条件的更新条件的文档,就会以这个条件和更新文档为基础创建一个新的文档;如果找到了匹配的文档,就正常更新,upsert非常方便,不必预置集合,同一套代码既能用于创建文档又可以更新文档
"""
# # 存在则更新,不存在则新建,
# collection.update_one({
# # 保证 数据 是唯一的
# '游戏名ID': data.get('游戏名ID')
# }, {
# '$set': data
# }, upsert=True)
collection.insert_one(data)
def response(flow):
url = 'https://app.taoshouyou.com/api/trades/gettradeslist'
if flow.request.url.startswith(url):
text = flow.response.text
print(flow)
data = json.loads(text)
print(data)
for a in data.get('data').get('list'):
# print(a)
id = a.get('id')
name = a.get('name')
gamename = a.get('gamename')
shopname = a.get('shopname')
officialprice = a.get('officialprice')
discount = a.get('discount')
price = a.get('price')
areaname = a.get('areaname')
goodsname = a.get('goodsname')
clientname = a.get('clientname')
shopUrl = a.get('shopUrl')
wb_data = {
'游戏名ID': id,
'游戏标题': name,
'游戏名': gamename,
'店铺': shopname,
'原价': officialprice,
'折扣': discount,
'现价': price,
'区服': areaname,
'账号类型': goodsname,
'客服端': clientname,
'商品url': shopUrl,
}
save_data(wb_data)
代码如下(示例):
# -*- encoding=utf8 -*-
__author__ = "Administrator"
import time
from airtest.core.api import *
auto_setup(__file__)
while True:
gg1 = 426,375
gg2 = 426,1453
swipe(gg2,gg1,duration=0.01)
time.sleep(1)
以上就是今天要讲的内容,本文仅仅简单介绍了APP数据的简单采集,仅限于个人学习。