public static void publishSync() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.37");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback confirmCallback = ( deliveryTag,multiple)->{
System.out.println("消息确认监听器成功确认:" + deliveryTag);
if(multiple){
ConcurrentNavigableMap<Long,String> confirmed =
outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else {
outstandingConfirms.remove(deliveryTag);
}
};
ConfirmCallback nconfirmCallback= ( deliveryTag,multiple)->{
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:"+deliveryTag +"-"+message);
};
channel.addConfirmListener(confirmCallback,nconfirmCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String mes = i + "";
channel.basicPublish("",queueName,null,mes.getBytes("UTF-8"));
outstandingConfirms.put(channel.getNextPublishSeqNo(),mes);
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条用时共计:" + (end-begin) );
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79