• Pyspider 使用带认证redis集群作为消息队列


    概述

    最近使用pyspider作为调度部署一个项目,因为客户方提供需要使用redis集群作为消息队列。所以在网上搜索了好多,都说不支持redis集群。静下心来一想,这么常规的需求不应该不支持呀。本着一切都在源码中的宗旨,打开了pyspider的源码,果然让我发现了其实是支持redis集群的。但是如果redis集群需要认证的话,就不支持了。因此,需要对pyspider的代码做一个改造,让它支持redis集群带认证的方式。

    pyspider message_queue 源码解读

    首先看redis_queue.py文件中,我们可以看到RedisQueue类的构造函数中是有关于StrictRedisCluster的信息的,说明pyspider是支持redis集群模式的。和单点的redis连接相比,其中并没有关于redis集群的密码认证的代码。

    class RedisQueue(object):
        """
        A Queue like message built over redis
        """
    
        Empty = BaseQueue.Empty
        Full = BaseQueue.Full
        max_timeout = 0.3
    
        def __init__(self, name, host='localhost', port=6379, db=0,
                     maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):
            """
            Constructor for RedisQueue
    
            maxsize:    an integer that sets the upperbound limit on the number of
                        items that can be placed in the queue.
            lazy_limit: redis queue is shared via instance, a lazy size limit is used
                        for better performance.
            """
            self.name = name
            if(cluster_nodes is not None):
                from rediscluster import StrictRedisCluster
                self.redis = StrictRedisCluster(startup_nodes=cluster_nodes)
            else:
                self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
            self.maxsize = maxsize
            self.lazy_limit = lazy_limit
            self.last_qsize = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    首先抛开redis集群的密码认证不说,如果是没有密码认证的集群,我们应该怎么配置呢。我们知道,在运行pypsider的时候,使用的命令是:

    pyspider -c  pyspider.json 
    
    • 1

    这种方式。也就是说我们程序的入口就在pyspider命令中。
    跟踪代码就会发现,与消息队列初始化有关的操作在run.py文件中,代码如下:

    @cli.command()
    @click.option('--fetcher-num', default=1, help='instance num of fetcher')
                kwargs['is_%s_default' % db] = True
    
        # create folder for counter.dump
        if not os.path.exists(kwargs['data_path']):
            os.mkdir(kwargs['data_path'])
    
        # message queue, compatible with old version
        if kwargs.get('message_queue'):
            pass
        elif kwargs.get('amqp_url'):
            kwargs['message_queue'] = kwargs['amqp_url']
        elif os.environ.get('RABBITMQ_NAME'):
            kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
                                       ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
        elif kwargs.get('beanstalk'):
            kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
    
        for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                     'fetcher2processor', 'processor2result'):
            if kwargs.get('message_queue'):
                kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
                    name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
            else:
                kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
                                                     kwargs['queue_maxsize'])
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    connect_message_queue 方法是非常关键的一个函数,继续跟踪,就会发现这个函数在message_queue包下面的__init__.py文件中,定义如下:

    def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):
        """
        create connection to message queue
    
        name:
            name of message queue
    
        rabbitmq:
            amqp://username:password@host:5672/%2F
            see https://www.rabbitmq.com/uri-spec.html
        beanstalk:
            beanstalk://host:11300/
        redis:
            redis://host:6379/db
            redis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)
        kombu:
            kombu+transport://userid:password@hostname:port/virtual_host
            see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
        builtin:
            None
        """
    
        if not url:
            from pyspider.libs.multiprocessing_queue import Queue
            return Queue(maxsize=maxsize)
    
        parsed = urlparse.urlparse(url)
        if parsed.scheme == 'amqp':
            from .rabbitmq import Queue
            return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
        elif parsed.scheme == 'beanstalk':
            from .beanstalk import Queue
            return Queue(name, host=parsed.netloc, maxsize=maxsize)
        elif parsed.scheme == 'redis':
            from .redis_queue import Queue
            if ',' in parsed.netloc:
                """
                redis in cluster mode (there is no concept of 'db' in cluster mode)
                ex. redis://host1:port1,host2:port2,...,hostn:portn
                """
                cluster_nodes = []
                for netloc in parsed.netloc.split(','):
                    cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})
    
                return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes)
    
            else:
                db = parsed.path.lstrip('/').split('/')
                try:
                    db = int(db[0])
                except:
                    logging.warning('redis DB must zero-based numeric index, using 0 instead')
                    db = 0
    
                password = parsed.password or None
    
                return Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)
        elif url.startswith('kombu+'):
            url = url[len('kombu+'):]
            from .kombu_queue import Queue
            return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
        else:
            raise Exception('unknown connection url: %s', url)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    pyspider的message_queue的配置文件

    使用redis集群时的配置文件

    由上面的代码我们就可以非常清楚的知道,如果使用redis集群模式时,首先要做的就是两件事:

    • 安装redis集群python工具包
    pip install rediscluster
    
    • 1
    • pyspider.json中配置如下:
    {
     "message_queue": "redis://192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
    }
    
    • 1
    • 2
    • 3

    节点与节点之间使用“,”分隔开。

    在上面的代码中,我们可以看到如果是redis的单节点带认证的方式,配置文件应该是如下所示:

    使用redis单点,带认证时的message_queue配置

    redis单节点有认证,则message_queue的url为:
    冒号+密码+@+host:port

    {
     "message_queue": redis://:redispass@192.168.0.1:5000/db0
    }
    
    • 1
    • 2
    • 3

    pyspider实现redis集群带认证的支持

    因此,我们可以模仿,如果使用redis集群带认证的方式按照redis单点类似的配置要怎么实现呢?
    即配置文件如下:

    {
     "message_queue": "redis://:redispass@192.168.0.1:6379,192.168.0.1:6380,192.168.0.2:6380,192.168.0.2:6379"
    }
    
    • 1
    • 2
    • 3

    如果要实现上面这种配置方式,显然我们是需要改代码的,首先要改的就是解析配置文件的地方。

    
    def connect_message_queue(name, url=None, maxsize=0, lazy_limit=True):
        """
        create connection to message queue
    
        name:
            name of message queue
    
        rabbitmq:
            amqp://username:password@host:5672/%2F
            see https://www.rabbitmq.com/uri-spec.html
        beanstalk:
            beanstalk://host:11300/
        redis:
            redis://host:6379/db
            redis://host1:port1,host2:port2,...,hostn:portn (for redis 3.x in cluster mode)
        kombu:
            kombu+transport://userid:password@hostname:port/virtual_host
            see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls
        builtin:
            None
        """
    
        if not url:
            from pyspider.libs.multiprocessing_queue import Queue
            return Queue(maxsize=maxsize)
    
        parsed = urlparse.urlparse(url)
        if parsed.scheme == 'amqp':
            from .rabbitmq import Queue
            return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
        elif parsed.scheme == 'beanstalk':
            from .beanstalk import Queue
            return Queue(name, host=parsed.netloc, maxsize=maxsize)
        elif parsed.scheme == 'redis':
            from .redis_queue import Queue
            
            # 从URL中解析出password
            password = parsed.password or None
            
            if ',' in parsed.netloc:
                """
                redis in cluster mode (there is no concept of 'db' in cluster mode)
                ex. redis://host1:port1,host2:port2,...,hostn:portn
                """
                cluster_nodes = []
                for netloc in parsed.netloc.split(','):
                    #拿到了每一个节点,因为第一个节点中有password信息,所以需要做处理
                    if password is not None:
                        prefix = ":"+password+"@"
                        netloc =netloc.replace(prefix,"")
                    cluster_nodes.append({'host': netloc.split(':')[0], 'port': int(netloc.split(':')[1])})
                # 在集群的构造函数中加入password
                return Queue(name=name, maxsize=maxsize, lazy_limit=lazy_limit, cluster_nodes=cluster_nodes,password=password)
    
            else:
                db = parsed.path.lstrip('/').split('/')
                try:
                    db = int(db[0])
                except:
                    logging.warning('redis DB must zero-based numeric index, using 0 instead')
                    db = 0
    
                password = parsed.password or None
    
                return Queue(name=name, host=parsed.hostname, port=parsed.port, db=db, maxsize=maxsize, password=password, lazy_limit=lazy_limit)
        elif url.startswith('kombu+'):
            url = url[len('kombu+'):]
            from .kombu_queue import Queue
            return Queue(name, url, maxsize=maxsize, lazy_limit=lazy_limit)
        else:
            raise Exception('unknown connection url: %s', url)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    由上述代码可知,我们改动的地方很少,所以还是非常简单的。

    • 第二处需要改动的则是RedisQueue构造函数,需要在连接RedisCluster时加入密码参数。
    class RedisQueue(object):
        """
        A Queue like message built over redis
        """
    
        Empty = BaseQueue.Empty
        Full = BaseQueue.Full
        max_timeout = 0.3
    
        def __init__(self, name, host='localhost', port=6379, db=0,
                     maxsize=0, lazy_limit=True, password=None, cluster_nodes=None):
            """
            Constructor for RedisQueue
    
            maxsize:    an integer that sets the upperbound limit on the number of
                        items that can be placed in the queue.
            lazy_limit: redis queue is shared via instance, a lazy size limit is used
                        for better performance.
            """
            self.name = name
            if(cluster_nodes is not None):
                # StrictRedisCluster 方法在新的版本中已经被移除了,所以需要更改成RedisCluster
                from rediscluster import RedisCluster
                self.redis = 
                # 在构造函数中加入密码
                RedisCluster(startup_nodes=cluster_nodes,password=password)
            else:
                self.redis = redis.StrictRedis(host=host, port=port, db=db, password=password)
            self.maxsize = maxsize
            self.lazy_limit = lazy_limit
            self.last_qsize = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    整个代码的改动也就不超过10行,就可以完美的实现我们的需求。

    当然,因为rediscluster工具包不再维护了,我们需要更换新的redis集群工具包。

    pip install redis==3.5.3
    pip install redis-py-cluster==2.1.3
    
    • 1
    • 2
  • 相关阅读:
    苹果App Store政策调整,模拟器游戏或成为新机遇
    北京卫视《为你喝彩》——星辰天合 CEO 胥昕,他专攻 SDS 让“数据常青”
    三类6种地图可视化软件测评,最好用的工具居然是它
    并发性,时间和相对性(2)
    Kafka数据同步原理详解
    【力扣每日一题】88. 合并两个有序数组 &双指针 & 辅助数组 & 8.13打卡
    杠杆炒股如何应对横盘震荡行情呢?
    CSRF-跨站点请求伪造
    02-Explain详解与索引最佳实践
    Linux安装rabbitMq(亲测可用)解决只能本地访问的问题
  • 原文地址:https://blog.csdn.net/wangzhongyudie/article/details/126238480