1、第一步:安装启动rabbitmq
0 第0步:安装yum:rpm -ivh yum-4.2.23-3.oe1.noarch.rpm
1 第1步:获取操作系统和cpu信息
(1)获取linux操作系统详细信息
cat /etc/os-release
cat /etc/redhat-release
(2)查看CPU架构:arch
2 第2步:安装gcc
(1)yum install -y gcc
3 第3步:依赖包安装
(1)yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto -y
4 第4步:安装
(1)Erlang官方下载地址:https://www.erlang.org/downloads
(2)解压:tar -zxvf otp_src_19.3.tar.gz
(3)手动创建erlang 的安装目录:mkdir /usr/local/erlang
(4)进入目录:cd otp_src_19.3
(5)配置erlang的安装信息:./configure --prefix=/usr/local/erlang --without-javac
./configure --prefix=/usr/local/erlang --with-ssl=/usr/include/openssl
(6)编译并安装:make && make install
(7)配置环境变量:vim /etc/profile
(8)将这些配置填写到profile文件的最后:
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
(9)启动环境变量配置文件:source /etc/profile
(10)验证:erl -version
5 第5步:安装rabbitmq
(1)RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html
(2)将RabbitMQ安装包rabbitmq-server-3.7.2-1.el7.noarch.rpm上传到/home目录
linux中的rpm文件,相当于windows中的exe可执行的软件安装包文件,即直接运行rpm文件就完成rabbitmq的安装了。
(3)安装RabbitMQ:rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm
(4)安装管控台插件:rabbitmq-plugins enable rabbitmq_management
6 第6步:启动rabbitmq
(1)启动RabbitMQ
动力节点:rabbitmq-server start &
乐字节:systemctl start rabbitmq-server.service
网上:systemctl start rabbitmq-server
注意:这里可能会出现错误,错误原因是/var/lib/rabbitmq/.erlang.cookie文件权限不够。
解决方案对这个文件授权
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
(2)验证:ps -ef|grep rabbitmq
乐字节:systemctl status rabbitmq-server.service
启动日志:https://www.csdn.net/tags/MtjaYg0sMDE2MDYtYmxvZwO0O0OO0O0O.html
cd /var/log/rabbitmq
tail -1000000000000 rabbit@25Cache.log
(3)停止服务
rabbitmqctl stop
7 第7步:用户管理
1、添加用户:rabbitmqctl add_user {username} {password}
rabbitmqctl add_user root root
2、删除用户:rabbitmqctl delete_user {username}
3、修改密码:rabbitmqctl change_password {username} {newpassword}
rabbitmqctl change_password root 123456
4、设置用户角色:rabbitmqctl set_user_tags {username} {tag}
rabbitmqctl set_user_tags root administrator
8 第8步:登录前台
http://RabbitMQ服务器IP:15672
RabbitMQ安装成功后使用默认用户名guest登录
账号:guest
密码:guest
2、附1:查找命令
whereis openssl
which openssl
3、附2:搜索软件相关的依赖,并安装相关依赖
yum search erlang
yum install erlang-crypto 或 yum install -y erlang
https://blog.csdn.net/qmqm011/article/details/103733146
yum list | grep erlang
yum remove erlang-*
1、第一步:安装Erlang
第1步:yum install erlang
第2步:配置Erlang环境变量
第3步:启用
第4步:验证
2、第二步:安装rabbitmq
第1步:yum install rabbitmq-server
第2步:启动 RabbitMQ 服务,并为其配置开机自启动。
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
第3步:查看状态
systemctl status rabbitmq-server.service and journalctl -xe
cd /var/log/rabbitmq/
rm -rf /var/log/rabbitmq/rabbit@25Cache.log /var/log/rabbitmq/rabbit@25Cache_upgrade.log
tail -1000000f /var/log/rabbitmq/rabbit@25Cache.log
tail -1000000f /var/log/rabbitmq/rabbit@25Cache_upgrade.log
第4步:安装管控台
rabbitmq-plugins enable rabbitmq_management
问题1:line 85: erl: command not found
https://blog.csdn.net/Android_Mrchen/article/details/104368618
https://www.freesion.com/article/5721599319/
问题2:rabbitmq 启动日志文件显示错误:throw:{error,{missing_dependencies,[crypto,ssl]
https://blog.csdn.net/qq_26400953/article/details/98743975
https://blog.csdn.net/liyuling52011/article/details/82750776
https://blog.csdn.net/qq_35946990/article/details/78967660
https://blog.csdn.net/weixin_42297483/article/details/82256461(是的,重装Erlang,配置ssl)
问题3:bad_return sasl,start
问题4:could_not_write_file rabbit@25Cache/cluster_nodes.config
在rabbit@25Cache目录下创建cluster_nodes.config文件,并把(多层)权限调整成777
问题5:error,corrupt_cluster_status_files
# rm -rf /var/lib/rabbitmq/mnesia/*
# systemctl start rabbitmq-server
https://access.redhat.com/solutions/1596043
3、附1:卸载rabbitmq相关的
1、卸载前先停掉rabbitmq服务,执行命令
service rabbitmq-server stop
2、查看rabbitmq安装的相关列表
yum list | grep rabbitmq
3、卸载rabbitmq已安装的相关内容
yum -y remove rabbitmq-server.noarch
4、附2:卸载erlang
1、查看erlang安装的相关列表
yum list | grep erlang
2、卸载erlang已安装的相关内容
yum -y remove erlang-*
yum remove erlang.x86_64
1、direct模式Send:
第一步:创建普通的springboot工程,该工程只依赖rabbitmq
第二步:pom.xml文件
第1步:spring-boot-starter-amqp
第2步:spring-boot-starter-test
第3步:spring-rabbit-test
第三步:application.properties
第1步:配置rabbitmq,ip、端口、账户
第四步:编码
第1步:SendService.class接口
public interface TestService {
void sendMessage(String message);
}
第2步:SendServiceImpl.class
@Service("sendService")
public class SendServiceImpl implements SendService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String message) {
/**
* 发送消息
* 参数1 为交换机名
* 参数2 为Routingkey
* 参教3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRouting",message);
}
}
第3步:Application.java
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(DemoApplication.class, args);
SendService sendService = (SendService) ac.getBean("sendService");
sendService.sendMessage("boot direct send 的测试数据");
}
}
第4步:RabbitMqConfig.java
package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RibbitMQConfig {
// 配置一个Direct类型的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
// 队列
@Bean
public Queue directQueue(){
return new Queue("bootDirectQueue");
}
/**
* 配置一个队列与交换机绑定
* @param directQueue 需要绑定的队列的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* @param directExchange 需要绑定的交换机的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* 注:with()方法是direct模式下交换机和队列绑定时使用的routingkey
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
}
}
第五步:启动项目
问题1:rabbitmq AmqpIOException: java.io.IOException
(无效)https://blog.csdn.net/qq_24950043/article/details/110704275
解决:控制台端口15672,而后台代码连接的端口5672
问题2:no exchange ' bootDirectExchange in vhost '/',即没有创建bootDirectExchange交换机,使用代码创建一下吧(回到第四步的第3步)
建议:建议在发送和接收方都同时创建交换机和绑定队列,因为direct模式下,发送方和接收方的启动不分先后(而后两种会先启动接收者进行监听)
解决方案1:配置类
解决方案2:注解
第六步:成功发送消息到我们的队列中
2、direct模式Receive:
第一步:创建普通的springboot工程,该工程只依赖rabbitmq
第二步:pom.xml文件
第1步:spring-boot-starter-amqp
第2步:spring-boot-starter-test
第3步:spring-rabbit-test
第三步:application.properties
第1步:配置rabbitmq,ip、端口、账户
第四步:编码:
第1步:ReceiveService.class接口
package com.rabbitmq.study.demo.service;
public interface ReceiveService {
void receiveMessage();
void receiveMessage(String message);
}
第2步:ReceiveServiceImpl.class实现类
package com.rabbitmq.study.demo.service.impl;
import com.rabbitmq.study.demo.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
@Resource
private AmqpTemplate amqpTemplate;
/**
* 这里个接收不是不间断接收消息,每执行一次这个方法只能接收一次消息, 如果有新消息进入则不会自动接收消息不建议使用
*/
public void receiveMessage() {
// 监听队列
//Message bootDirectQueue = amqpTemplate.receive("bootDirectQueue");
String message = (String)amqpTemplate.receiveAndConvert("bootDirectQueue");
System.out.println("DirectQueue message "+message);
}
// 配置一个消息监听器
/**
* @RabbitListener注解用于标记当前方法是一个RabbitMQ的消息监听方法,作用是持续性的自动接收消息
* 这个方法不需要手动调用spring会自动运行这个监听
* 属性
* queues用于指定一个已经存在的队列名,用于进行队列的监听
* @param message 接收到的具 体的消息数据
*/
@RabbitListener
public void receiveMessage(String message) {
System.out.println("DirectQueue message 2 "+message);
/**
* 注意:如果当前监听方法正常结束Spring就会自动确认消息,如果出现异常则不会确认消息
* 因此在消息处理时我们需要做好消息的防止重复处理工作
/
//System.out.println(10/0);
}
}
第3步:Application.java
package com.rabbitmq.study.demo;
import com.rabbitmq.study.demo.service.ReceiveService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class RabbitmqDirectReceiveDljdQinApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(RabbitmqDirectReceiveDljdQinApplication.class, args);
ReceiveService receiveService = (ReceiveService)ac.getBean("receiveService");
//使用了消息监听器接收消息那么就不需要(也不能)调用接收方法来接收消息
//receiveService.receiveMessage();
}
}
第4步:
package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RibbitMQConfig {
// 配置一个Direct类型的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
// 队列
@Bean
public Queue directQueue(){
return new Queue("bootDirectQueue");
}
/**
* 配置一个队列与交换机绑定
* @param directQueue 需要绑定的队列的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* @param directExchange 需要绑定的交换机的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* 注:with()方法是direct模式下交换机和队列绑定时使用的routingkey
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
}
}
第五步:启动项目
第六步:成功接收到我们的队列中的消息,并在控制台中打印
3、综合测试结果分析:
1 情况1:第一次
第1步:写好发送方,并启动,发送一条消息,rabbitmq管控台有1条消息。
第2步:写好接收方,并启动。接收方自动接收发送发的消息,rabbitmq管控台消息数量减1。
2 情况2:第二次
第1步:再次重启发送方,再发送一条消息,
第2步:接收方,竟然不会自动消费消息,rabbitmq管控台消息数量不会减1。
结 论:问题是,接收方不能持续监听并接收消息。
解 决:使用spring监听器(spring自动回调的方法)
3 情况3:情况1和情况2成立的前提下,在接收方的接收方法中在输出消息后加入异常(10/0)
第1步:启动发送方,再发送一条消息
第2步:接收方的控制台打印可以看出,出现死循环了,错误日志打个不停。但死循环不是重点,重点是我们的接收方竟然还是可以接收到消息,而且
来到rabbitmq的管控台,发现队列中的消息没有确认掉(减1)。
结 论:注意:如果当前监听方法正常结束Spring就会自动确认消息,如果出现异常则不会确认消息
因此在消息处理时我们需要做好消息的防止重复处理工作
1、fanout模式Receive:
第一步:编码
第1步:
package com.rabbitmq.study.demo.service.impl;
import com.rabbitmq.study.demo.service.ReceiveService;
import jdk.internal.org.objectweb.asm.tree.analysis.Value;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jws.soap.SOAPBinding;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(//@QueueBinding注解要完成队列和交换机的
value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
exchange = @Exchange(name="fanoutExchange",type = "fanout")//创建一个交换机
)
}
)
public void fanoutReceiveMessage01(String message) {
System.out.println("fantoutQueue message 2 "+message);
}
@RabbitListener(bindings = {
@QueueBinding(//@QueueBinding注解要完成队列和交换机的
value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
exchange = @Exchange(name="fanoutExchange",type = "fanout")//创建一个交换机
)
}
)
public void fanoutReceiveMessage02(String message) {
System.out.println("fantoutQueue message 2 "+message);
}
}
第2步:RibbitMQConfig.java可以不用,因为fanout模式下是随机队列
第二步:首先,先启动接收者。在rabbitmq管控台,两个随机队列创建成功。
2、fanout模式Send:
第一步:编码
第1步:
package com.rabbitmq.study.demo.service.impl;
import com.rabbitmq.study.demo.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendService")
public class SendServiceImpl implements SendService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void fanoutSendMessage(String message) {
/**
* 发送消息
* 参数1 为交换机名
* 参数2 为Routingkey
* 参教3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("fanoutExchange",null,message);
}
}
第2步:可以不绑定,但一定要创建交换机
package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RibbitMQConfig {
// 配置一个fanout类型的交换机
@Bean
public FanoutExchange directExchange(){
return new FanoutExchange("fanoutExchange");
}
}
第二步:启动,发送方
3、综合测试:
1 情况1:消息发送方发送一条消息,接收方控制台就打印出两条日志。同时消费一旦被所有的监听者都消费一次后,rabbitmq的管控台就会把随机队列删除掉。
1、topic模式Receive,全注解开发:
第一步:编码
第1步:
package com.rabbitmq.study.demo.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("receiveServiceImpl")
public class ReceiveServiceImpl {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue("topic1"),// 随机队列名称
key = {"aa"},//routingkey
exchange = @Exchange(name="topicExchange",type = "topic")//创建一个交换机
)
}
)
public void topicReceiveMessage01(String message) {
System.out.println("消费者1---aa--- "+message);
}
@RabbitListener(bindings = {
@QueueBinding(//@QueueBinding注解要完成队列和交换机的
value = @Queue("topic2"),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
key = {"aa.*"},//routingkey
exchange = @Exchange(name="topicExchange",type = "topic")//创建一个交换机
)
}
)
public void topicReceiveMessage02(String message) {
System.out.println("消费者2---aa.*--- "+message);
}
@RabbitListener(bindings = {
@QueueBinding(//@QueueBinding注解要完成队列和交换机的
value = @Queue("topic2"),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
key = {"aa.#"},//routingkey
exchange = @Exchange(name="topicExchange",type = "topic")//创建一个交换机
)
}
)
public void topicReceiveMessage03(String message) {
System.out.println("消费者3---aa.#--- "+message);
}
}
}
第二步:启动,发送方
2、topic模式Send,全注解开发:
第一步:编码
第1步:准备交换机
package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RibbitMQConfig {
// 配置一个topic类型的交换机
@Bean
public TopicExchange directExchange(){
return new TopicExchange("topicExchange");
}
}
第2步:接口实现
package com.rabbitmq.study.demo.service.impl;
import com.rabbitmq.study.demo.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendService")
public class SendServiceImpl implements SendService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void topicSendMessage(String message) {
/**
* 发送消息
* 参数1 为交换机名
* 参数2 为Routingkey
* 参教3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("topicExchange","aa",message);
}
}
第二步:启动程序,发送消息
3、综合测试:
情况1:发送到aa,匹配发送到routingkey=aa或aa.*
1、 direct模式Send,全注解开发:
第一步:application.properties
spring.rabbitmq.publisher-confirm-type=simple
spring.rabbitmq.publisher-returns=true
第二步:编码
第1步:接口
第2步:接口实现类
第3步:配置类
第4步:程序入口类
第5步:Pom.xml,开启消息确认模式和return模式。参考:https://cn-blogs.cn/archives/9323.html,https://www.bilibili.com/read/cv11845848/。
spring.rabbitmq.publisher-confirm-type=simple
spring.rabbitmq.publisher-returns=true
第6步:PublisherConfirmAndReturnConfig.java,开启消息确认模式和return模式。参考:https://cn-blogs.cn/archives/9323.html,https://www.bilibili.com/read/cv11845848/。
package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达Exchange");
}else {
// TODO重发的措施等处理工作
System.out.println("消息没有送达Exchange");
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// TODO重发的措施等处理工作
System.out.println("消息没有送达Queue");
}
}
第三步:启动消息发送者
2、direct模式Receive,全注解开发:
第一步:
第1步:接口
第2步:接口实现类,请参考:https://wenku.baidu.com/view/e2a37f8cde3383c4bb4cf7ec4afe04a1b071b0b5.html
package com.rabbitmq.study.demo.service.impl;
import com.rabbitmq.client.Channel;
import com.rabbitmq.study.demo.service.ReceiveService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService {
@Resource
private AmqpTemplate amqpTemplate;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 这里个接收不是不间断接收消息,每执行一次这个方法只能接收一次消息, 如果有新消息进入则不会自动接收消息不建议使用
*/
public void receiveMessage() {
// 监听队列
//Message bootDirectQueue = amqpTemplate.receive("bootDirectQueue");
String message = (String)amqpTemplate.receiveAndConvert("qfbootDirectQueue");
System.out.println("qf DirectQueue message 1 "+message);
}
@Override
public void receiveMessage(String message) {
System.out.println("qf DirectQueue message 2 "+message);
}
// 配置一个消息监听器
/**
* @RabbitListener注解用于标记当前方法是一个RabbitMQ的消息监听方法,作用是持续性的自动接收消息
* 这个方法不需要手动调用spring会自动运行这个监听
* 属性
* queues用于指定一个已经存在的队列名,用于进行队列的监听
* @param message 接收到的具 体的消息数据
*/
@RabbitListener(queues = {"qfbootDirectQueue"})
public void receiveMessage(String msg, Channel channel, Message message) throws IOException {
// 0.获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
// 1.设置key到redis
if(redisTemplate.opsForValue().setIfAbsent(messageId, "0",10, TimeUnit.SECONDS)){
// 2.消费消息
System.out.println("qf 接收到消息---"+ msg);
// 3.设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10, TimeUnit.SECONDS);
// 4.手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
// 5.获取redis中的value即可,如果是1,手动Ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
第3步:配置类
第4步:程序入口类
第5步:使用Redis Desktop Manager对redis进行连接测试
第6步:配置redis
#配置rabbitmq
spring.rabbitmq.host=10.203.4.25
#spring.rabbitmq.host=10.203.5.185
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
#配置redis
spring.redis.host=10.203.4.25
spring.redis.port=6379
spring.redis.database=0
spring.redis.password=Re_dis#567.890
第二步:启动消息接收者
3、综合测试:
情况1:单独启动消息发送者,控制台打印“消息已经送达Exchange”。
单独启动消息接收者,成功接收到消息。
情况2:重启消息发送者,消息接收者成功接收到消息。
1、 direct模式Send,全注解开发:
第一步:编译
第1步:实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class People implements Serializable {
// 在实际开发中是不需要写的。
// 在测试环境中必须写,因为Publisher项目和Consumer项目都在一套JVM中,必须制定序列码,否则无法序列化。
public static final long serialVersionUID=1L;
private Integer id;
private String name;
}
第2步:XxxService接口实现类
@Override
public void sendMessage(People people){
amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRouting",people);
}
第3步:程序入口类
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(DemoApplication.class, args);
SendService sendService = (SendService) ac.getBean("sendService");
//sendService.sendMessage("boot direct send 的测试数据");
sendService.sendMessage(new People(1235135135,"zhansan"));
}
}
第二步::启动消息发送方
第三步::到rabbitmq管控台上查看消息
2、direct模式receive,全注解开发:
第一步:编码
第1步:实体类
第2步:编写自动监听消息方法
@RabbitListener(queues = {"sxtbootDirectQueue"})
public void receiveMessage(Message message) {
try {
byte[] body = message.getBody();
InputStream is = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(is);
People people = (People) ois.readObject();
System.out.println("sxt DirectQueue message 3 "+people);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
第3步:程序入口类监听并接收消息
@SpringBootApplication
public class SxtDirectReceiveApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(SxtDirectReceiveApplication.class, args);
ReceiveService receiveService = (ReceiveService)ac.getBean("receiveService");
//使用了消息监听器接收消息那么就不需要(也不能)调用接收方法来接收消息
//receiveService.receiveMessage();
}
}
第二步:启动消息消费者,成功接收消息
3、综合测试:接收者成功接收实体对象
#配置应用名称 server.servlet.context-path=/sendReceive server.port=8090 #配置rabbitmq spring.rabbitmq.host=10.203.4.25 #spring.rabbitmq.host=10.203.5.185 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root
@Data
@NoArgsConstructor
@AllArgsConstructor
public class People implements Serializable {
// 在实际开发中是不需要写的。
// 在测试环境中必须写,因为Publisher项目和Consumer项目都在一套JVM中,必须制定序列码,否则无法序列化。
public static final long serialVersionUID=1L;
private Integer id;
private String name;
} package com.rabbitmq.study.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RibbitMQConfig {
// 配置一个Direct类型的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("sendAndReceiveDirectExchange");
}
// 队列
@Bean
public Queue directQueue(){
return new Queue("sendAndReceiveDirectQueue");
}
/**
* 配置一个队列与交换机绑定
* @param directQueue 需要绑定的队列的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* @param directExchange 需要绑定的交换机的对象(bean对象),
* 参数名必须与@Bean注解下的方法的方法名相同,
* 这样就会自动进行注入
* 注:with()方法是direct模式下交换机和队列绑定时使用的routingkey
* @return
*/
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
return BindingBuilder.bind(directQueue).to(directExchange).with("sendAndReceiveDirectRouting");
}
}
public interface SendAndReceiveService {
void sendMessage(People people);
void receiveMessage(Message message);
}
@Service("sendService")
public class SendAndReceiveServiceImpl implements SendService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(People people) {
/**
* 发送消息
* 参数1 为交换机名
* 参数2 为Routingkey
* 参教3 为我们的具体发送的消息数据
*/
amqpTemplate.convertAndSend("sendAndReceiveDirectExchange","sendAndReceiveDirectRouting",people);
}
@RabbitListener(queues = {"sendAndReceiveDirectQueue"})
public void receiveMessage(Message message) {
try {
byte[] body = message.getBody();
InputStream is = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(is);
People people = (People) ois.readObject();
System.out.println("csnd send and receive DirectQueue message "+people);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
@RestController
public class SendAndReceiveController {
@Autowired
private SendAndReceiveService sendService;
@RequestMapping("/send")
public void sendMessage(){
People people = new People(1234567890,"wangwu");
sendService.sendMessage(people);
}
} package com.rabbitmq.study.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DirectSendReceivePostmanApplication {
public static void main(String[] args) {
SpringApplication.run(DirectSendReceivePostmanApplication.class, args);
}
} 

org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.amqp spring-rabbit-test test org.springframework.boot spring-boot-starter-websocket
#内嵌服务器端口 server.port=8080 #配置rabbitmq spring.rabbitmq.host=192.168.32.133 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root
- package rabbitmq.studey.demo.mq;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutRabbitConfig {
-
- /**
- * 定义队列A
- * @return
- */
- @Bean
- public Queue aMessage() {
- return new Queue("q_fanout_A");
- }
-
- /**
- * 定义队列B
- * @return
- */
- @Bean
- public Queue bMessage() {
- return new Queue("q_fanout_B");
- }
-
- /**
- * 定义交换机
- * @return
- */
- @Bean
- FanoutExchange fanoutExchange() {
- return new FanoutExchange("myfanoutExchange");
- }
-
- /**
- * 将队列A和交换机进行绑定
- * @return
- */
- @Bean
- Binding bindingExchangeA() {
- return BindingBuilder.bind(aMessage()).to(fanoutExchange());
- }
-
- /**
- * 将队列B和交换机进行绑定
- * @return
- */
- @Bean
- Binding bindingExchangeB() {
- return BindingBuilder.bind(bMessage()).to(fanoutExchange());
- }
- }
- package rabbitmq.studey.demo.mq;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MsgSenderFanout {
- @Autowired
- private AmqpTemplate rabbitTemplate;
-
- /**
- * 在自己的方法中调用这个类下的这个方法,根据自己不同的需求
- * 接收消息,并将消息传到mq交换机上,绑定了该交换机的队列的消费者都可收到该消息
- * @param message
- */
- public void send(String message) {
- System.out.println("Sender : " + message);
- rabbitTemplate.convertAndSend("myfanoutExchange","", message);
- }
- }
- package rabbitmq.studey.demo.websocket;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.web.socket.server.standard.ServerEndpointExporter;
-
- /**
- * 开启WebSocket支持
- */
- @Configuration
- public class WebSocketConfig {
- @Bean
- public ServerEndpointExporter serverEndpointExporter() {
- return new ServerEndpointExporter();
- }
- }
这个方法可以获取到mq队列中的消息,我这里是将要接收消息的用户id在我的后台方法中拼接成一个字符串,然后放进mq队列中去,这个方法中获取到mq队列中的信息之后,将他进行处理成集合然后与websocket中的登陆用户进行对比
- package rabbitmq.studey.demo.websocket;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.*;
- import java.util.concurrent.ConcurrentHashMap;
-
- /**
- * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
- * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
- *
- * @RabbitListene注解是用来接收某个队列中的消息,如果该队列有消息,就给@RabbitHandle注解下的方法
- */
- @RabbitListener(queues = "q_fanout_A")
- @ServerEndpoint(value = "/websocket")
- @Component
- public class WebSocketServer {
- //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
- private static int onlineCount = 0;
-
- //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识
- //private static CopyOnWriteArraySet
webSocketSet = new CopyOnWriteArraySet(); - private static ConcurrentHashMap
webSocketSet = new ConcurrentHashMap<>(); - /**
- * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
- */
- private static Map
sessionPools = new HashMap<>(); -
- //与某个客户端的连接会话,需要通过它来给客户端发送数据
- private Session session;
-
- /**
- * 连接建立成功调用的方法
- *
- * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- @OnOpen
- public void onOpen(Session session) {
- //这个地方我是从session中获取登陆id,然后作为map中的key,session作为value
- //方法我觉得怪怪的,还得循环遍历,哪位大佬要是有什么好的意见给我指点指点
- this.session = session;
- List
uid = session.getRequestParameterMap().get("uid"); - if (!CollectionUtils.isEmpty(uid)) {
- for (String id : uid) {
- webSocketSet.put(id, this); //加入set中
- addOnlineCount(); //在线数加1
- }
- }
-
- System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
- }
-
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose(Session session) {
- List
uid = session.getRequestParameterMap().get("uid"); - if (!CollectionUtils.isEmpty(uid)) {
- for (String id : uid) {
- webSocketSet.remove(id); //从set中删除
- subOnlineCount(); //在线数减1
- System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
- }
- }
- }
-
- /**
- * 收到客户端消息后调用的方法
- * @param message 客户端发送过来的消息
- * @param session 可选的参数
- */
- @OnMessage
- public void onMessage(String message, Session session) {
- Map
userIdMap = new HashMap(); - System.out.println("来自客户端的消息:" + message);
- try {
- //这个地方的admin是写死的,我的需求是消息推送,如果用户阅读了消息,我这里直接在前端调用这个方法传admin进来,然后就可以实时更新前端数据了
- if ("admin".equals(message)) {//阅读类型的
- for (String key : webSocketSet.keySet()) {
- webSocketSet.get(key).sendMessage(message);
- }
- } else {
- String[] messageAry = message.split(",");
- for (String id : messageAry) {
- userIdMap.put(id, id);
- }
- //群发消息
- for (String key : webSocketSet.keySet()) {
- if (userIdMap.get(key) != null) {
- webSocketSet.get(key).sendMessage(message);
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- /**
- * 发生错误时调用
- *
- * @param session
- * @param error
- */
- @OnError
- public void onError(Session session, Throwable error) {
- System.out.println("发生错误");
- error.printStackTrace();
- }
-
- /**
- * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
- *
- * @param message
- * @throws IOException
- */
- public void sendMessage(String message) throws IOException {
- this.session.getAsyncRemote().sendText(message);
- }
-
- /**
- * 查询连接人数的方法
- * @return
- */
- public static synchronized int getOnlineCount() {
- return onlineCount;
- }
-
- /**
- * 连接人数+1的方法
- * @return
- */
- public static synchronized void addOnlineCount() {
- WebSocketServer.onlineCount++;
- }
-
- /**
- * 连接人数-1的方法
- * @return
- */
- public static synchronized void subOnlineCount() {
- WebSocketServer.onlineCount--;
- }
-
- /**
- * 这个方法没用到,但是也先留着,说不定哪天就用到了
- * 发送信息给指定ID用户,如果用户不在线则返回不在线信息给自己
- *
- * @param message
- * @param sendUserId
- * @throws IOException
- */
- public void sendtoUser(String message, List
sendUserId) throws IOException { - for (String userId : sendUserId) {
- if (webSocketSet.get(userId) != null) {
- webSocketSet.get(userId).sendMessage(message);
- }
- }
-
- }
-
- /**
- * rabbit订阅队列中的消息(发送消息)
- *
- * @param message
- */
- @RabbitHandler
- public void process(String message) {
- onMessage(message, session);
- }
- }
package rabbitmq.studey.demo;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableRabbit/**开启rabbitmq*/
public class RabbitmqBootWosocketDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqBootWosocketDemoApplication.class, args);
}
}
- html>
- <html>
- <head>
- <title>Java后端WebSocket的Tomcat实现title>
- <link rel="stylesheet" href="../../../static/layui2/css/layui.css" media="all"/>
- <link rel="stylesheet" href="../../../static/css/font_eolqem241z66flxr.css" media="all"/>
- <link rel="stylesheet" href="../../../static/css/user.css" media="all"/>
- head>
- <body>
- Welcome<br/><input id="text" type="text"/>
- <button onclick="send()">发送消息button>
- <hr/>
- <button onclick="closeWebSocket()">关闭WebSocket连接button>
- <hr/>
- <div id="message">div>
- body>
- <script type="text/javascript" src="../../../static/js/jquery.js">script>
- <script type="text/javascript" src="../../../static/layui2/layui.js">script>
- <script type="text/javascript" src="../../../static/js/queryUtils.js">script>
- <script type="text/javascript">
- var websocket = null;
- //判断当前浏览器是否支持WebSocket
- if ('WebSocket' in window) {
- //这个地方特别坑,建立连接的地址在网上众说纷纭
- //我个人实现的方法是
- //ws:如果你是http请求就写ws;如果你是https请求得改成wss
- //localhost:8080:项目启动地址(本地),如果登陆页面是:http://localhost:8080/login,就用localhsot:8080
- //websocket:这个写上WebSocketServer类上@ServerEndpoint注解后的value值
- //uid:这个是传到后台去的登陆id
- websocket = new WebSocket("ws://localhost:8080/websocket?uid=6");
- } else {
- alert('当前浏览器 Not support websocket')
- }
-
- //连接发生错误的回调方法
- websocket.onerror = function () {
- setMessageInnerHTML("WebSocket连接发生错误");
- };
-
- //连接成功建立的回调方法
- websocket.onopen = function () {
- setMessageInnerHTML("WebSocket连接成功");
- }
-
- //接收到消息的回调方法
- websocket.onmessage = function (res) {
- setMessageInnerHTML(res.data);
- }
-
- //连接关闭的回调方法
- websocket.onclose = function () {
- setMessageInnerHTML("WebSocket连接关闭");
- }
-
- //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
- window.onbeforeunload = function () {
- closeWebSocket();
- }
-
- //将消息显示在网页上
- function setMessageInnerHTML(innerHTML) {
- document.getElementById('message').innerHTML += innerHTML + '
'; - }
-
- //关闭WebSocket连接
- function closeWebSocket() {
- websocket.close();
- }
-
- //发送消息,这是websocket的实时消息
- function send() {
- var message = document.getElementById('text').value;
- websocket.send(message);
- }
- script>
- html>



参考:
springboot项目下WebSocket+RabbitMq的搭建(代码)
springboot项目下WebSocket+RabbitMq的搭建(代码备用)