现在有一批数据写入多台 Redis 相同 key 的队列中,需要消费 Redis 队列作为 Flink Source,为了提高可用性,下面基于 JedisPool 进行队列的消费。队列数据示例: 1,2,3,4,5、A,B,C,D,E,程序将字符串解析并 split(",") 然后分别写到下游。
由于数据量较大,所以同时写入 N 台 Redis 队列,key 均相同,注意这里是 JedisPool 不是 JedisCluster,需要区分二者的概念。
- def initJedisPool(host: String, port: Int): JedisPool = {
- val config = new JedisPoolConfig
- config.setMaxTotal(4)
- config.setMaxIdle(2)
- config.setMaxWaitMillis(1000)
- config.setTestOnBorrow(true)
- config.setTestOnReturn(true)
-
- jedisPool = new JedisPool(config, host, port)
- jedisPool
- }
需要导入 Jedis 依赖:
- <dependency>
- <groupId>redis.clientsgroupId>
- <artifactId>jedisartifactId>
- <version>2.7.2&