消息(Message)是指在应用中传递的数据,可以是字符串或json数据
消息中间件是指利用高效可靠的消息传递机制,一般有两种传递模式:点对点模式和发布订 阅模式
点对点模式(p2p,point to point)是基于队列的,消息生产者发送消息到队列,消息消费者从队列接受消息,队列的存在使消息的异步传输成为了可能。
发布订阅模式定义了如何向一个内存节点发布订阅消息,这个内容节点称为主题(topic),主题相当于中介,消息发布者发布到一个主题,消息订阅者在一个主题订阅,发布者和订阅者不用接触便能保证消息的传递,发布和订阅在消息的一对多广播时采用。
解耦:消息中间件提供了对应的api,消息生产者和消费者依赖对应的api,可以不用互相调用对方实现消息的传递
沉余(存储):处理数据的过程可能会失败,消息中间件可以把数据持久化直到他们完全被处理,保证数据的安全性
拓展性:消息中间件解耦了消息的处理过程,提高队列的入队和处理的效率时很容易的,只要增加其处理过程
削峰:在访问量剧增下,消息中间件关键组件能支撑访问压力,不会因超负荷请求而崩溃
可恢复性:一部分组件失效后,不会影响整个系统,一个处理消息的进程挂掉,加入中间件的消息仍能在系统恢复之后进行处理
顺序保证:可以保证数据顺序处理
缓冲:消息中间件通过缓冲层来帮助任务更高效率的执行,有助于优化和控制数据流经过系统的速度
异步通信:可以将不需要及时处理的消息放入消息队列,之后慢慢处理
添加一个队列
生产者示例代码
- public class RabbitProducer {
-
- public static final String EXCHANGE_NAME = "exchange_name";
- public static final String ROUTING_KEY = "toutingkey_demo";
- public static final String QUEUE_NAME = "queue_demo";
- public static final String IP_ADDRESS = "127.0.0.1";
- /**
- * rabbitmq默认端口
- */
- public static final int PORT = 5672;
-
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(IP_ADDRESS);
- factory.setPort(PORT);
- factory.setUsername("guest");
- factory.setPassword("guest");
- //创建连接
- Connection connection = factory.newConnection();
- //创建信道
- Channel channel = connection.createChannel();
- //创建一个type="direct",持久化的,非自动删除的交换器
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
- //创建一个持久化,非排他的,非自动删除的队列
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
- //发送一条持久化的消息:Hello World!
- String message = "Hello World!";
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes());
- channel.close();
- connection.close();
-
- }
-
-
- }
消费者示例代码
- public class RabbitConsumer {
-
- public static final String QUEUE_NAME = "queue_demo";
- public static final String IP_ADDRESS = "127.0.0.1";
- /**
- * rabbitmq默认端口
- */
- public static final int PORT = 5672;
-
- public static void main(String[] args) throws Exception {
- Address[] addresses = new Address[]{
- new Address(IP_ADDRESS, PORT)
- };
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("guest");
- factory.setPassword("guest");
- //创建连接
- Connection connection = factory.newConnection(addresses);
- //创建信道
- Channel channel = connection.createChannel();
- //设置客户端最多接受未被ack的消息的个数
- channel.basicQos(64);
- Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("recv message:" + new String(body));
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
-
- channel.basicConsume(QUEUE_NAME, consumer);
- //等待回调行数执行完毕之后,关闭资源
- TimeUnit.SECONDS.sleep(5);
- channel.close();
- connection.close();
-
-
- }
-
-
- }
出自RabbitMQ实战指南