@Builder
public class DynamicContainerFactoryBean implements FactoryBean<SimpleMessageListenerContainer> {
@Getter
private String queue;
private String exchangeType;
@Getter
private String exchange;
private String binding;
private ConnectionFactory connectionFactory;
private AmqpAdmin amqpAdmin;
private boolean durable = true;
private String routeKey;
private boolean autoAck;
private MessageListener listener;
@Override
public SimpleMessageListenerContainer getObject() throws Exception {
//声名队列,交换机,binding
amqpAdmin.declareQueue(declareQueue());
amqpAdmin.declareExchange(declareExchange());
amqpAdmin.declareBinding(declareBinding(declareQueue(), declareExchange()));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setQueueNames(queue);
container.setAutoStartup(false);
container.setMessageListener(listener);
container.setAmqpAdmin(amqpAdmin);
container.setAutoDeclare(true);
container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
container.setConnectionFactory(connectionFactory);
return container;
}
private Binding declareBinding(Queue declareQueue, Exchange declareExchange) {
return BindingBuilder.bind(declareQueue).to(declareExchange).with(routeKey).noargs();
}
private Exchange declareExchange() {
switch (exchangeType) {
case ExchangeTypes.DIRECT:
return ExchangeBuilder.directExchange(exchange).durable(true).build();
case ExchangeTypes.TOPIC:
return ExchangeBuilder.topicExchange(exchange).durable(true).build();
case ExchangeTypes.FANOUT:
return ExchangeBuilder.fanoutExchange(exchange).durable(true).build();
case ExchangeTypes.HEADERS:
return ExchangeBuilder.headersExchange(exchange).durable(true).build();
}
throw new RuntimeException("不支持的交换器类型");
}
private Queue declareQueue() {
return QueueBuilder.durable(queue).build();
}
@Override
public Class<?> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}
@Component
public class DynamicListenerManager {
private Map<String, SimpleMessageListenerContainer> map = new ConcurrentHashMap<>();
public void add(String name, SimpleMessageListenerContainer container) {
map.put(name, container);
}
public void start() {
for (SimpleMessageListenerContainer container : map.values()) {
container.start();
}
}
public void stop() {
for (SimpleMessageListenerContainer container : map.values()) {
container.stop();
}
}
public void purge() {
map.clear();
}
}
public class MyMessageListener implements ChannelAwareMessageListener {
private SimpleMessageListenerContainer container;
private boolean autoAck;
public MyMessageListener(SimpleMessageListenerContainer container) {
this.container = container;
this.autoAck = container.getAcknowledgeMode().isAutoAck();
}
@Override
public void onMessage(Message message, Channel channel) throws Exception {
if (!autoAck) {
try {
doOnMessage(message, channel);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
} else {
doOnMessage(message, channel);
}
}
private void doOnMessage(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("dynamic msg: " + msg);
}
@Override
public void onMessage(Message message) {
}
}
@Component
public class DynamicContainerFactory {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private ConnectionFactory factory;
@Autowired
private DynamicListenerManager dynamicListenerManager;
private Map<String, DynamicContainerFactoryBean> map = new ConcurrentHashMap<>();
public void dynamicCreate(DynamicContainerDto dynamicContainerDto) {
try {
DynamicContainerFactoryBean factoryBean = new DynamicContainerFactoryBean.DynamicContainerFactoryBeanBuilder()
.amqpAdmin(rabbitAdmin)
.autoAck(dynamicContainerDto.isAutoAck())
.durable(dynamicContainerDto.isDurable())
.routeKey(dynamicContainerDto.getRouteKey())
.connectionFactory(factory)
.exchangeType(dynamicContainerDto.getExchangeType())
.queue(dynamicContainerDto.getQueue())
.exchange(dynamicContainerDto.getExchange())
.binding(dynamicContainerDto.getBinding())
.build();
SimpleMessageListenerContainer container = factoryBean.getObject();
MyMessageListener myMessageListener = new MyMessageListener(container);
container.setMessageListener(myMessageListener);
dynamicListenerManager.add(dynamicContainerDto.getContainerName(), container);
map.put(dynamicContainerDto.getContainerName(), factoryBean);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void start() {
dynamicListenerManager.start();
}
public void stop() {
dynamicListenerManager.stop();
}
public void purge() {
for (DynamicContainerFactoryBean factoryBean : map.values()) {
rabbitAdmin.deleteQueue(factoryBean.getQueue());
rabbitAdmin.deleteExchange(factoryBean.getExchange());
}
map.clear();
}
}
@RestController
public class DynamicController {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private DynamicContainerFactory dynamicContainerFactory;
/**
* 创建队列
* @param name
* @return
*/
@GetMapping("/queue/create/{name}")
public String createQueue(@PathVariable("name") String name) {
Queue queue = QueueBuilder.durable(name).build();
String s = rabbitAdmin.declareQueue(queue);
if (StringUtils.hasText(s)) {
System.out.println("create queue: " + s);
return "success";
} else {
return "failed";
}
}
/**
* 创建交换器
* @param name
* @return
*/
@GetMapping("/ex/create/{name}")
public String createExchange(@PathVariable("name") String name) {
Exchange exchange = ExchangeBuilder.directExchange(name).durable(true).build();
rabbitAdmin.declareExchange(exchange);
System.out.println("create exchange: " + exchange);
return "success";
}
/**
* 创建 binding
* @param bindingDto
* @return
*/
@GetMapping("/binding/create")
public String createBinding(BindingDto bindingDto) {
Binding binding = BindingBuilder.bind(new Queue(bindingDto.getQueue()))
.to(new DirectExchange(bindingDto.getExchange()))
.with(bindingDto.getRouteKey());
rabbitAdmin.declareBinding(binding);
System.out.println("create binding: " + binding);
return "success";
}
/**
* 创建容器监听
* @param req
* @return
*/
@GetMapping("/container/create")
public String createContainer(DynamicContainerDto req){
dynamicContainerFactory.dynamicCreate(req);
return "success";
}
/**
* 启动监听
* @return
*/
@GetMapping("/container/start")
public String containerStart(){
dynamicContainerFactory.start();
return "success";
}
/**
* 关闭监听
* @return
*/
@GetMapping("/container/stop")
public String containerStop(){
dynamicContainerFactory.stop();
return "success";
}
/**
* 删除容器
* @return
*/
@GetMapping("/container/purge")
public String containerPurge(){
dynamicContainerFactory.purge();
return "success";
}
}
good luck!