配置消息处理器
- import java.util.UUID;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.integration.annotation.IntegrationComponentScan;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.config.EnableIntegration;
- import org.springframework.integration.core.MessageProducer;
- import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.MessageHandler;
-
- @Configuration
- @IntegrationComponentScan
- @EnableIntegration
- public class MqttClientConfig {
-
- /**
- * 连接工厂.
- */
- @Bean
- public MqttPahoClientFactory mqttClientFactory() {
- DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
- MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
- mqttConnectOptions.setUserName("test1");
- mqttConnectOptions.setPassword("123456".toCharArray());
- mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"});
- mqttConnectOptions.setKeepAliveInterval(2);
- mqttConnectOptions.setAutomaticReconnect(true);
- factory.setConnectionOptions(mqttConnectOptions);
- return factory;
- }
-
-
- private String createClientId() {
- return UUID.randomUUID().toString();
- }
-
- /**
- * 配置client,发布.
- */
- @Bean
- @ServiceActivator(inputChannel = "mqttOutboundChannel")
- public MessageHandler mqttOutbound() {
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
- createClientId(), mqttClientFactory());
- messageHandler.setAsync(true);
- messageHandler.setDefaultQos(2);
- messageHandler.setDefaultRetained(false); //不保留消息
- return messageHandler;
- }
-
- @Bean
- public MessageChannel mqttOutboundChannel() {
- return new DirectChannel();
- }
-
- //接收通道
- @Bean
- public MessageChannel mqttInputChannel() {
- return new DirectChannel();
- }
-
- /**
- * 配置client,监听的topic.
- */
- @Bean
- public MessageProducer inbound() {
- String[] topics = {"first_topic"};
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter(createClientId(),
- mqttClientFactory(), topics);
- adapter.setCompletionTimeout(3_000);
- adapter.setConverter(new DefaultPahoMessageConverter());
- adapter.setQos(2);
- adapter.setOutputChannel(mqttInputChannel());
- return adapter;
- }
-
- /**
- * 消息处理器
- */
- @Bean
- @ServiceActivator(inputChannel = "mqttInputChannel")
- public MessageHandler handler() {
- return (message -> {
- String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
- String payload = message.getPayload().toString();
- System.out.println("消息主题:" + topic);
- System.out.println("消息内容:" + payload);
- });
- }
- }
配置消息通道MessageChannel和消息处理器,消息通道有两个,一个用来发送消息,一个用来接收消息。
配置消息网关
- import org.springframework.integration.annotation.MessagingGateway;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- @Component
- @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
- public interface MqttGateway {
-
- void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
- }
加上@Component注解是为了IDE不报错,不加也不影响功能。
客户端使用
- import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-
- public class TestSpringMqtt {
-
- public static void main(String[] args) {
- // 创建容器上下文
- AnnotationConfigApplicationContext context =
- new AnnotationConfigApplicationContext(MqttClientConfig.class);
- MqttGateway mqttGateway = (MqttGateway) context.getBean("mqttGateway");
- // 获取消息网关Bean对象并发送消息
- mqttGateway.sendToMqtt("hello", "first_topic");
- }
- }
@IntegrationComponentScan注解
先来分析@IntegrationComponentScan注解,它会导入IntegrationComponentScanRegistrar配置类。
- public class IntegrationComponentScanRegistrar implements ImportBeanDefinitionRegistrar,
- ResourceLoaderAware, EnvironmentAware {
-
- private final Map
componentRegistrars = - new HashMap
(); -
- // 过滤类型为@MessagingGateway注解,会交给MessagingGatewayRegistrar来处理
- public IntegrationComponentScanRegistrar() {
- this.componentRegistrars.put(new AnnotationTypeFilter(MessagingGateway.class, true),
- new MessagingGatewayRegistrar());
- }
-
- @Override
- public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
- Map
componentScan = - importingClassMetadata.getAnnotationAttributes(IntegrationComponentScan.class.getName());
-
- Collection
basePackages = getBasePackages(importingClassMetadata, registry); -
- if (basePackages.isEmpty()) {
- basePackages = Collections.singleton(ClassUtils.getPackageName(importingClassMetadata.getClassName()));
- }
-
- ClassPathScanningCandidateComponentProvider scanner =
- new ClassPathScanningCandidateComponentProvider(false, this.environment) {
-
- @Override
- protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
- return beanDefinition.getMetadata().isIndependent()
- && !beanDefinition.getMetadata().isAnnotation();
- }
-
- };
-
- filter(registry, componentScan, scanner); // NOSONAR - never null
-
- scanner.setResourceLoader(this.resourceLoader);
-
- // 开始扫描
- for (String basePackage : basePackages) {
- Set
candidateComponents = scanner.findCandidateComponents(basePackage); - for (BeanDefinition candidateComponent : candidateComponents) {
- if (candidateComponent instanceof AnnotatedBeanDefinition) {
- for (ImportBeanDefinitionRegistrar registrar : this.componentRegistrars.values()) {
- // 交给具体的注册器来处理,这里就是MessagingGatewayRegistrar
- registrar.registerBeanDefinitions(((AnnotatedBeanDefinition) candidateComponent).getMetadata(),
- registry);
- }
- }
- }
- }
- }
-
- }
此注解作用类似于@ComponentScan注解,会扫描包含@MessagingGateway注解的Class,并注册到容器中。继续分析MessagingGatewayRegistrar
- public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar {
-
- @Override
- public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
- if (importingClassMetadata != null && importingClassMetadata.isAnnotated(MessagingGateway.class.getName())) {
- Map
annotationAttributes = - importingClassMetadata.getAnnotationAttributes(MessagingGateway.class.getName());
- BeanDefinitionReaderUtils.registerBeanDefinition(this.parse(annotationAttributes), registry);
- }
- }
-
- public BeanDefinitionHolder parse(Map
gatewayAttributes) { - // 创建具体的BeanDefinition
- String defaultPayloadExpression = (String) gatewayAttributes.get("defaultPayloadExpression");
- // 核心,接口的具体实现类为GatewayProxyFactoryBean
- BeanDefinitionBuilder gatewayProxyBuilder =
- BeanDefinitionBuilder.genericBeanDefinition(GatewayProxyFactoryBean.class);
- AbstractBeanDefinition beanDefinition = gatewayProxyBuilder.getBeanDefinition();
- beanDefinition.addMetadataAttribute(new BeanMetadataAttribute(IntegrationConfigUtils.FACTORY_BEAN_OBJECT_TYPE,
- serviceInterface));
-
- return new BeanDefinitionHolder(beanDefinition, id);
- }
-
- }
-
使用GatewayProxyFactoryBean类来作为接口的具体实现。
- public class GatewayProxyFactoryBean extends AbstractEndpoint
- implements TrackableComponent, FactoryBean
-
- protected void onInit() {
- synchronized (this.initializationMonitor) {
- // 创建动态代理对象,这里的serviceInterface就是我们定义的MqttGateway,
- // 拦截器就是GatewayProxyFactoryBean自身
- this.serviceProxy = new ProxyFactory(this.serviceInterface, this).getProxy(this.beanClassLoader);
- }
- }
-
- @Override
- public Object getObject() {
- if (this.serviceProxy == null) {
- this.onInit();
- }
- // 返回创建的代理对象
- return this.serviceProxy;
- }
- }
GatewayProxyFactoryBean使用动态代理(底层是Cglib或JDK)创建一个代理对象,拦截器就是自身,所以当我们调用MqttGateway的sendToMqtt()方法时,
就会被拦截到GatewayProxyFactoryBean的invoke()方法。
发送Mqtt消息流程分析
从GatewayProxyFactoryBean类的invoke()方法开始
- @Override
- @Nullable
- public Object invoke(final MethodInvocation invocation) throws Throwable { // NOSONAR
- // 继续执行
- return doInvoke(invocation, true);
- }
- @Nullable
- private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningOnCallerThread) {
- Method method = invocation.getMethod();
- // 根据method找到对应的MethodInvocationGateway
- MethodInvocationGateway gateway = this.gatewayMap.get(method);
- // 发送消息并接收响应
- response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, shouldReply);
- // 处理返回值类型
- return response(gateway.returnType, shouldReturnMessage, response);
- }
具体发送的处理会交给MethodInvocationGateway来处理,进入它的父类MessagingGatewaySupport的send()方法
- protected void send(Object object) {
- // 根据@MessagingGateway注解中配置的defaultRequestChannel属性值从容器中获取到消息通道Bean对象
- MessageChannel channel = getRequestChannel();
- try {
- this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
- }
- }
这个消息通道对象就是我们在MqttClientConfig配置类中声明的如下Bean,具体类型为DirectChannel。
- @Bean
- public MessageChannel mqttOutboundChannel() {
- return new DirectChannel();
- }
我们继续分析messagingTemplate的convertAndSend()方法,最终进入GenericMessagingTemplate的doSend()方法
- protected final void doSend(MessageChannel channel, Message> message, long timeout) {
- Message> messageToSend = message;
- // 委托给channel对象来发送消息,这里的channel具体类型为上面定义的DirectChannel
- boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
- }
进入DirectChannel的父类AbstractMessageChannel的send()方法
- @Override // NOSONAR complexity
- public boolean send(Message> messageArg, long timeout) {
- Message> message = messageArg;
- try {
- // 消息类型转换
- message = convertPayloadIfNecessary(message);
- // 实际发送消息
- sent = doSend(message, timeout);
- return sent;
- }
- }
- @Override
- protected boolean doSend(Message> message, long timeout) {
- try {
- //这里的MessageDispatcher(消息分发器)类型为UnicastingDispatcher
- return getRequiredDispatcher().dispatch(message);
- }
- }
DirectChannel中定义的消息分发器类型为UnicastingDispatcher,这是一个单播分发器,表示这条消息只会由一个消息处理器来处理,区别于广播分发器BroadcastingDispatcher。
- private boolean doDispatch(Message> message) {
- // 先尝试优化处理,如果只有一个消息处理器的话,直接交给它处理
- if (tryOptimizedDispatch(message)) {
- return true;
- }
- boolean success = false;
- // 有多个消息处理器的情况
- Iterator
handlerIterator = getHandlerIterator(message); - List
exceptions = null; - // 有一个处理成功就退出循环
- while (!success && handlerIterator.hasNext()) {
- MessageHandler handler = handlerIterator.next();
- try {
- // 消息处理器处理消息
- handler.handleMessage(message);
- success = true; // we have a winner.
- }
- }
- return success;
- }
其实这里的消息处理器就是我们在MqttClientConfig配置类中定义的如下Bean
- /**
- * 配置client,发布.
- */
- @Bean
- @ServiceActivator(inputChannel = "mqttOutboundChannel")
- public MessageHandler mqttOutbound() {
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
- createClientId(), mqttClientFactory());
- messageHandler.setAsync(true);
- messageHandler.setDefaultQos(2);
- messageHandler.setDefaultRetained(false); //不保留消息
- return messageHandler;
- }
继续分析MqttPahoMessageHandler是如何处理消息的
- public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
- implements MqttCallback, ApplicationEventPublisherAware {
-
-
- @Override
- protected void handleMessageInternal(Message> message) throws Exception {
- Object mqttMessage = this.converter.fromMessage(message, Object.class);
- String topic = this.topicProcessor.processMessage(message);
- // 发布消息
- publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
- }
-
- @Override
- protected void publish(String topic, Object mqttMessage, Message> message) throws Exception {
- // 根据MqttPahoClientFactory创建MqttAsyncClient对象,向Mqtt服务器发送Mqtt消息
- IMqttDeliveryToken token = checkConnection()
- .publish(topic, (MqttMessage) mqttMessage);
- }
- }
至此Mqtt消息发送的流程就结束了,总结一下
但Spring是什么时候向消息分发器中添加消息处理器的呢,这个就涉及到@ServiceActivator注解的功能了。而@ServiceActivator注解功能的开启需要@EnableIntegration注解,
所以接下来继续分析@EnableIntegration注解。
@EnableIntegration注解
从注解的名称可以简单看出来,开启集成功能,它会引入IntegrationRegistrar配置类。
- public class IntegrationRegistrar implements ImportBeanDefinitionRegistrar {
-
- @Override
- public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
- BeanDefinitionRegistry registry) {
-
- registerImplicitChannelCreator(registry);
- registerDefaultConfiguringBeanFactoryPostProcessor(registry);
- registerIntegrationConfigurationBeanFactoryPostProcessor(registry);
- if (importingClassMetadata != null) {
- // 向容器中注册MessagingAnnotationPostProcessor
- registerMessagingAnnotationPostProcessors(importingClassMetadata, registry);
- }
- }
- }
此配置类会向容器中注册很多Bean,我们只关注MessagingAnnotationPostProcessor,这是一个BeanPostProcessor。
- public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean,
- SmartInitializingSingleton {
-
- @Override
- public void afterPropertiesSet() {
- ((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(
- IntegrationContextUtils.DISPOSABLES_BEAN_NAME,
- BeanDefinitionBuilder.genericBeanDefinition(Disposables.class, Disposables::new)
- .getRawBeanDefinition());
- this.postProcessors.put(Filter.class, new FilterAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(Transformer.class, new TransformerAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(InboundChannelAdapter.class,
- new InboundChannelAdapterAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(BridgeFrom.class, new BridgeFromAnnotationPostProcessor(this.beanFactory));
- this.postProcessors.put(BridgeTo.class, new BridgeToAnnotationPostProcessor(this.beanFactory));
- Map
extends Annotation>, MethodAnnotationPostProcessor>> customPostProcessors = - setupCustomPostProcessors();
- if (!CollectionUtils.isEmpty(customPostProcessors)) {
- this.postProcessors.putAll(customPostProcessors);
- }
- }
- }
可以看到,它会处理很多注解类型的解析,包括我们用到的@ServiceActivator注解。
- @Override
- public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
- Assert.notNull(this.beanFactory, "BeanFactory must not be null");
-
- Class> beanClass = AopUtils.getTargetClass(bean);
- // 遍历每个方法,解析上述注解
- ReflectionUtils.doWithMethods(beanClass,
- method -> doWithMethod(method, bean, beanName, beanClass),
- ReflectionUtils.USER_DECLARED_METHODS);
- return bean;
- }
此方法会在Bean初始化之后调用,解析上述的所有注解
- protected void processAnnotationTypeOnMethod(Object bean, String beanName, Method method,
- Class extends Annotation> annotationType, List
annotations) { - // 根据对应的注解类型找到对应的注解处理器,
- // 以@ServiceActivator注解为例,得到的postProcessor类型就是ServiceActivatorAnnotationPostProcessor
- MethodAnnotationPostProcessor> postProcessor =
- MessagingAnnotationPostProcessor.this.postProcessors.get(annotationType);
- if (postProcessor != null && postProcessor.shouldCreateEndpoint(method, annotations)) {
- Method targetMethod = method;
- if (this.initialized) {
- // 处理此Method并注册Endpoint
- postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
- postProcessor, targetMethod);
- }
- }
- }
继续跟进去
- private void postProcessMethodAndRegisterEndpointIfAny(Object bean, String beanName, Method method,
- Class extends Annotation> annotationType, List
annotations, - MethodAnnotationPostProcessor> postProcessor, Method targetMethod) {
- // 以@ServiceActivator注解为例,处理之后得到result为EventDrivenConsumer类型,是AbstractEndpoint的子类
- Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
- if (result instanceof AbstractEndpoint) {
- // 将这个创建的endpoint对象实例注册到Bean容器中并初始化
- AbstractEndpoint endpoint = (AbstractEndpoint) result;
- String endpointBeanName = generateBeanName(beanName, method, annotationType);
- endpoint.setBeanName(endpointBeanName);
- // 作为单例注册到容器中
- getBeanFactory().registerSingleton(endpointBeanName, endpoint);
- // Bean初始化
- getBeanFactory().initializeBean(endpoint, endpointBeanName);
- }
- }
接下来再看一下EventDrivenConsumer的作用
- public class EventDrivenConsumer extends AbstractEndpoint implements IntegrationConsumer {
- @Override
- protected void doStart() {
- // 将消息处理器添加到消息通道中,具体来说就是添加到消息分发器中
- this.inputChannel.subscribe(this.handler);
- if (this.handler instanceof Lifecycle) {
- ((Lifecycle) this.handler).start();
- }
- }
- }
简单总结一下
接收Mqtt消息流程分析
接收消息是由我们配置类中定义的MqttPahoMessageDrivenChannelAdapter来处理的
- /**
- * 配置client,监听的topic.
- */
- @Bean
- public MessageProducer inbound() {
- String[] topics = {"first_topic"};
- MqttPahoMessageDrivenChannelAdapter adapter =
- new MqttPahoMessageDrivenChannelAdapter(createClientId(),
- mqttClientFactory(), topics);
- adapter.setCompletionTimeout(3_000);
- adapter.setConverter(new DefaultPahoMessageConverter());
- adapter.setQos(2);
- adapter.setOutputChannel(mqttInputChannel());
- return adapter;
- }
它是一个MqttCallback,重写了messageArrived()方法,如果不了解MqttCallback可以查看Spring整合Mqtt简单使用。
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) {
- Message> message = this.getConverter().toMessage(topic, mqttMessage);
- try {
- // 发送消息
- sendMessage(message);
- }
- }
- protected void sendMessage(Message> messageArg) {
- Message> message = messageArg;
- MessageChannel messageChannel = getOutputChannel();
- // 会委托给MessageChannel来发送
- this.messagingTemplate.send(messageChannel, message);
- }
MessageChannel(消息通道)委托给自身的消息分发器来处理,找到对应的消息处理器
- /**
- * 消息处理器
- */
- @Bean
- @ServiceActivator(inputChannel = "mqttInputChannel")
- public MessageHandler handler() {
- return (message -> {
- String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
- String payload = message.getPayload().toString();
- System.out.println("消息主题:" + topic);
- System.out.println("消息内容:" + payload);
- });
- }
至此接收Mqtt消息的的流程就结束了。
Spring整合Mqtt协议使用到了spring-integration框架,它内部又依赖spring-messaging框架。Spring中消息处理一共有3种角色(实际上不止3种,这里我们仅用到了3种)
通过@ServiceActivator注解将具体的消息处理器添加到消息通道中。消息生产者生成一个消息,通过内部的消息通道将消息分发给对应的消息处理器。
注意,这里所说的消息为Spring-messaging框架中的通用消息,和Mqtt消息没有关系。
Mqtt消息发送的流程为:
Mqtt消息接收的流程为: