redis的list数据结构用来做一部消息队列,使用rpush/lpush操作入队列。 使用lpop/rpop来出队列。
> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)
如果队列空了,客户端会陷入pop的死循环,不停的pop,没有数据,接着pop,又没有数据。空轮询会拉高客户端的CPU,redisde QPS也会被拉高。
阻塞读在队列没有数据时,会立即进入休眠状态,一旦数据到来,则立刻醒来,消息的延迟几乎为0,用blpop/brpop替代前面的lpop/rpop。可解决上面问题。
如果线程一直阻塞,redis客户端链接成了闲置链接,久了服务器会自动断开链接,以减少资源的占用。此时blpop/brpop会抛出异常。
演示队列可以通过redis的zset(有序列表)来实现。我们将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset活到期任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem task = new TaskItem();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一条
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇会继续
}
catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 抢到了
TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
}
catch (InterruptedException e) {
}
}
}
因为没有ask机制,当消费端崩溃后消息丢失。pop出消息后,list 中就没这个消息了,如果处理消息的程序拿到消息还未处理就挂掉了,那消息就丢失了,所以是不可靠队列
1.Redis zset支持高性能的 score 排序。
2.Redis是在内存上进行操作的,速度非常快。
3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。
4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性