1、消息推送
- /**
- * @Auther: pshdhx
- * @Date: 2023/02/22/10:38
- * @Description: 往同一个stream队列里边塞值,同一队列的所有消费者组,都会收到消息
- * 模拟 消息推送到服务器
- */
- public class TestPubStream {
- public static void main(String[] args) {
- // 创建 Redis 连接
- RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();
- redisURI.setPassword("xxxxxx?");
- RedisClient redisClient = RedisClient.create(redisURI);
- StatefulRedisConnection
connection = redisClient.connect(); - try {
- // 获取同步命令对象
- RedisCommands
syncCommands = connection.sync(); - // 创建消费者组
- String streamKey = "mystream"; // Stream名称
- String groupName = "myConsumer"; // 消费者组名称
- //检查 groupName 是否存在
- boolean groupExists = false;
- List
- for (Object obj : result) {
- ArrayList objList = (java.util.ArrayList) obj;
- if((objList.get(1)+"").equals(groupName)){
- groupExists = true;
- break;
- }
- }
-
- System.out.println("groupExists = " + groupExists);
-
- // 如果 groupName 不存在,则创建
- if (!groupExists) {
- syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(streamKey, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream());
- }
- // 发布消息
- syncCommands.xadd(streamKey, "key1", "val1");
-
- } finally {
- // 关闭连接
- connection.close();
- redisClient.shutdown();
- }
- }
- }
2、实现消息订阅
- /**
- * @Auther: pshdhx
- * @Date: 2023/02/22/10:40
- * @Description: 模拟 tomcat1 使用消费者组1,消费者1向redis 服务器 订阅推送的消息
- */
-
- public class TestSub1_tomcat1 {
- public static void main(String[] args) {
- // 创建 Redis 连接
- RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();
- redisURI.setPassword("xxxxxxxxxx");
- RedisClient redisClient = RedisClient.create(redisURI);
- StatefulRedisConnection
connection = redisClient.connect(); - try {
-
-
- // 从消费者组中获取消息
- String consumerName = "tomcat1_consumer_name"; // 消费者名称
- RedisCommands
streamCommands = connection.sync(); -
- while (true) {
- List
> messages = streamCommands.xreadgroup( - Consumer.from(groupName, consumerName),
- XReadArgs.Builder.block(Duration.ofSeconds(5)), // 阻塞5秒钟等待新消息
- XReadArgs.StreamOffset.lastConsumed(streamKey)
- );
-
- if (messages.isEmpty()) {
- continue; // 在没有新消息时继续轮询
- }
-
- for (StreamMessage
message : messages) { - System.out.println("tomcat 1 Received message: " + message.getBody());
-
- // 手动确认消息已被处理
- syncCommands.xack(streamKey, groupName, message.getId());
- }
- }
-
- } finally {
- // 关闭连接
- connection.close();
- redisClient.shutdown();
- }
- }
- }