• 基于后端和爬虫创建的代理ip池


    搭建免费的代理ip池


    需要解决的问题:

    1. 使用什么方式存储ip

      • 文件存储

        缺点: 打开文件修改文件操作较麻烦

      • mysql

        缺点: 查询速度较慢

      • mongodb

        缺点: 查询速度较慢. 没有查重功能

      • redis --> 使用redis存储最为合适

      所以 -> 数据结构采用redis中的zset有序集合

    2. 获取ip的网站

    3. 项目架构???

    项目架构

    1. 获取api
    2. 筛选api
    3. 验证api的有效性
    4. 提供api

    项目结构图

    项目结构如下:

    项目代码

    code文件夹

    redis_proxy.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/4 11:32
    # @author: Maxs_hu
    """
    这里用来做redis中间商. 去控制redis和ip之间的调用关系
    """
    from redis import Redis
    import random
    
    
    class RedisProxy:
        def __init__(self):
            # 连接到redis数据库
            self.red = Redis(
                host='localhost',
                port=6379,
                db=9,
                password=123456,
                decode_responses=True
            )
    
        # 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉
        # 2. 需要校验所有的ip. 查询ip
        # 3. 验证可用性. 可用分值拉满. 不可用扣分
        # 4. 将可用的ip查出来返回给用户
        #       先给满分的
        #       再给有分的
        #       都没有分. 就不给
    
        def add_ip(self, ip):  # 外界调用并传入ip
            # 判断ip在redis中是否存在
            if not self.red.zscore('proxy_ip', ip):
                self.red.zadd('proxy_ip', {ip: 10})
                print('proxy_ip存储完毕', ip)
            else:
                print('存在重复', ip)
    
        def get_all_proxy(self):
            # 查询所有的ip功能
            return self.red.zrange('proxy_ip', 0, -1)
    
        def set_max_score(self, ip):
            self.red.zadd('proxy_ip', {ip: 100})  # 注意是引号的格式
    
        def deduct_score(self, ip):
            # 先将分数查询出来
            score = self.red.zscore('proxy_ip', ip)
            # 如果有分值.那就扣一分
            if score > 0:
                self.red.zincrby('proxy_ip', -1, ip)
            else:
                # 如果分值已经扣的小于0了. 那么可以直接删除了
                self.red.zrem('proxy_ip', ip)
    
        def effect_ip(self):
            # 先将ip通过分数筛选出来
            ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)
            if ips:
                return random.choice(ips)
            else:  # 没有满分的
                # 将九十分以上的筛选出来
                ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1)
                if ips:
                    return random.choice(ips)
                else:
                    print('无可用ip')
                    return None
    
    
    折叠

    ip_collection.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/4 11:32
    # @author: Maxs_hu
    """
    这里用来收集ip
    """
    from redis_proxy import RedisProxy
    import requests
    from lxml import html
    from multiprocessing import Process
    import time
    import random
    
    
    def get_kuai_ip(red):
        url = "https://free.kuaidaili.com/free/intr/"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
        }
        resp = requests.get(url, headers=headers)
        etree = html.etree
        et = etree.HTML(resp.text)
        trs = et.xpath('//table//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            port = tr.xpath('./td[2]/text()')
            if not ip:  # 将不含有ip值的筛除
                continue
            proxy_ip = ip[0] + ":" + port[0]
            red.add_ip(proxy_ip)
    
    
    def get_unknown_ip(red):
        url = "https://ip.jiangxianli.com/"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
        }
        resp = requests.get(url, headers=headers)
        etree = html.etree
        et = etree.HTML(resp.text)
        trs = et.xpath('//table//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            port = tr.xpath('./td[2]/text()')
            if not ip:  # 将不含有ip值的筛除
                continue
            proxy_ip = ip[0] + ":" + port[0]
            red.add_ip(proxy_ip)
    
    
    def get_happy_ip(red):
        page = random.randint(1, 5)
        url = f'http://www.kxdaili.com/dailiip/2/{page}.html'
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
        }
        resp = requests.get(url, headers=headers)
        etree = html.etree
        et = etree.HTML(resp.text)
        trs = et.xpath('//table//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            port = tr.xpath('./td[2]/text()')
            if not ip:  # 将不含有ip值的筛除
                continue
            proxy_ip = ip[0] + ":" + port[0]
            red.add_ip(proxy_ip)
    
    
    def get_nima_ip(red):
        url = 'http://www.nimadaili.com/'
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
        }
        resp = requests.get(url, headers=headers)
        etree = html.etree
        et = etree.HTML(resp.text)
        trs = et.xpath('//table//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')  # 这里存在空值. 所以不能在后面加[0]
            if not ip:
                continue
            red.add_ip(ip[0])
    
    
    def get_89_ip(red):
        page = random.randint(1, 26)
        url = f'https://www.89ip.cn/index_{page}.html'
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
        }
        resp = requests.get(url, headers=headers)
        etree = html.etree
        et = etree.HTML(resp.text)
        trs = et.xpath('//table//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            if not ip:
                continue
            red.add_ip(ip[0].strip())
    
    
    def main():
        # 创建一个redis实例化对象
        red = RedisProxy()
        print("开始采集数据")
        while 1:
            try:
                # 这里可以添加各种采集的网站
                print('>>>开始收集快代理ip')
                get_kuai_ip(red)  # 收集快代理
                # get_unknown_ip(red)  # 收集ip
                print(">>>开始收集开心代理ip")
                get_happy_ip(red)  # 收集开心代理
                print(">>>开始收集泥马代理ip")
                # get_nima_ip(red)  # 收集泥马代理
                print(">>>开始收集89代理ip")
                get_89_ip(red)
                time.sleep(60)
            except Exception as e:
                print('ip储存出错了', e)
                time.sleep(60)
    
    
    if __name__ == '__main__':
        main()
        # 创建一个子进程
        # p = Process(target=main)
        # p.start()
    
    
    折叠

    ip_verify.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/4 11:34
    # @author: Maxs_hu
    """
    这里用来验证ip的可用性: 使用携程发送请求增加效率
    """
    from redis_proxy import RedisProxy
    from multiprocessing import Process
    import asyncio
    import aiohttp
    import time
    
    
    async def verify_ip(ip, red, sem):
        timeout = aiohttp.ClientTimeout(total=10)  # 设置网页等待时间不超过十秒
        try:
            async with sem:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url='http://www.baidu.com/',
                                           proxy='http://'+ip,
                                           timeout=timeout) as resp:
                        page_source = await resp.text()
                        if resp.status in [200, 302]:
                            # 如果可用. 加分
                            red.set_max_score(ip)
                            print('验证没有问题. 分值拉满~', ip)
                        else:
                            # 如果不可用. 扣分
                            red.deduct_score(ip)
                            print('问题ip. 扣一分', ip)
        except Exception as e:
            print('出错了', e)
            red.deduct_score(ip)
            print('问题ip. 扣一分', ip)
    
    
    async def task(red):
        ips = red.get_all_proxy()
        sem = asyncio.Semaphore(30)  # 设置每次三十的信号量
        tasks = []
        for ip in ips:
            tasks.append(asyncio.create_task(verify_ip(ip, red, sem)))
        if tasks:
            await asyncio.wait(tasks)
    
    
    def main():
        red = RedisProxy()
        time.sleep(5)  # 初始的等待时间. 等待采集到数据
        print("开始验证可用性")
        while 1:
            try:
                asyncio.run(task(red))
                time.sleep(100)
            except Exception as e:
                print("ip_verify出错了", e)
                time.sleep(100)
    
    
    if __name__ == '__main__':
        main()
        # 创建一个子进程
        # p = Process(target=main())
        # p.start()
    
    折叠

    ip_api.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/4 11:35
    # @author: Maxs_hu
    
    """
    这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip:
       1. flask
       2. sanic --> 今天使用这个要稍微简单一点
    """
    from redis_proxy import RedisProxy
    from sanic import Sanic, json
    from sanic_cors import CORS
    from multiprocessing import Process
    
    # 创建一个app
    app = Sanic('ip')  # 随便给个名字
    # 解决跨域问题
    CORS(app)
    red = RedisProxy()
    
    
    @app.route('maxs_hu_ip')  # 添加路由
    def api(req):  # 第一个请求参数固定. 请求对象
       ip = red.effect_ip()
       return json({"ip": ip})
    
    
    def main():
       # 让sanic跑起来
       app.run(host='127.0.0.1', port=1234)
    
    
    if __name__ == '__main__':
       main()
       # p = Process(target=main())
       # p.start()
    
    折叠

    runner.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/5 17:36
    # @author: Maxs_hu
    from ip_api import main as api_run
    from ip_collection import main as coll_run
    from ip_verify import main as veri_run
    from multiprocessing import Process
    
    
    def main():
        # 设置互不干扰的三个进程
        p1 = Process(target=api_run)  # 只需要将目标函数的内存地址传过去即可
        p2 = Process(target=coll_run)
        p3 = Process(target=veri_run)
    
        p1.start()
        p2.start()
        p3.start()
    
    
    if __name__ == '__main__':
        main()
    

    测试ip是否可用.py

    # -*- encoding:utf-8 -*-
    # @time: 2022/7/5 18:15
    # @author: Maxs_hu
    import requests
    
    
    def get_proxy():
        url = "http://127.0.0.1:1234/maxs_hu_ip"
        resp = requests.get(url)
        return resp.json()
    
    
    def main():
        url = 'http://mip.chinaz.com/?query=' + get_proxy()["ip"]
        proxies = {
            "http": 'http://' + get_proxy()["ip"],
            "https": 'http://' + get_proxy()["ip"]  # 目前代理只支持http请求
        }
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36",
        }
        resp = requests.get(url, proxies=proxies, headers=headers)
        resp.encoding = 'utf-8'
        print(resp.text)  # 物理位置
    
    
    if __name__ == '__main__':
        main()
    
    折叠

    运行效果

    项目运行截图:

    redis储存截图:

    总结

    1. 免费代理ip只支持http的网页操作. 并不好用. 如果有需求可以进行购买然后加入ip代理池
    2. 网页部署到自己的服务器上. 别人访问自己的服务器. 以后学了全栈可以加上登录. 和付费功能. 实现功能的进一步拓展
    3. 项目架构是生产者消费者模型. 三个模块同时运行. 每个模块为一个进程. 互不影响
    4. 代理设计细节有待处理. 但总体运行效果还可以. 遇到问题再修改
  • 相关阅读:
    vue项目 使用axios封装request请求(一)
    Web开发者福音!创建第一个Vite支持的Web应用(2/2)
    计算物理专题----蒙特卡洛积分实战
    python列表操作
    推荐算法中CTR和CVR的ESMM模型pytorch实现
    详解 SpringMVC 的 HttpMessageConverter
    java替换jar中的class文件
    RT-Thread 双向链表(学习笔记)
    mysql Varchar字符存储时报错
    CISSP学习笔记:灾难恢复计划
  • 原文地址:https://www.cnblogs.com/Max-message/p/16448795.html