- //获取一个流
- RStream rStream = redissonClient.getStream("testStream");
- //创建一个map,添加数据
- Map
rr = new HashMap<>(); - rr.put("xx", RandomUtil.randomString(5));
- //添加到流
- rStream.addAll(rr);
rStream支持的add函数如下:
2、sub端sub端订阅的方法有三种:
1、调用read方法

2、调用range方法

3、调用readGroup方法

基于组的订阅如下:
- RStream rStream = redissonClient.getStream("testStream");
- //创建分组
- rStream.createGroup("default",StreamMessageId.ALL);
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- //读取default分组中,消费者名词为consumer_1,每次读取三个,30秒阻塞。
- Map
> ss = rStream.readGroup("default", "consumer_1", 3, 30, TimeUnit.SECONDS); - Map
> ss = rStream.readGroup("default","1234",3,StreamMessageId.ALL); - for (StreamMessageId streamMessageId : ss.keySet()) {
- System.out.println(streamMessageId.toString()+"__"+ss.get(streamMessageId));
- rStream.remove(streamMessageId);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
分组的好处是,假如有10个订阅者同时在一个分组,那么分组中的消息只会给其中某一个订阅者。