• Spring整合mqtt原理分析


    代码示例

    配置消息处理器

    1. import java.util.UUID;
    2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.Configuration;
    5. import org.springframework.integration.annotation.IntegrationComponentScan;
    6. import org.springframework.integration.annotation.ServiceActivator;
    7. import org.springframework.integration.channel.DirectChannel;
    8. import org.springframework.integration.config.EnableIntegration;
    9. import org.springframework.integration.core.MessageProducer;
    10. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    11. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    12. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    13. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    14. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    15. import org.springframework.messaging.MessageChannel;
    16. import org.springframework.messaging.MessageHandler;
    17. @Configuration
    18. @IntegrationComponentScan
    19. @EnableIntegration
    20. public class MqttClientConfig {
    21. /**
    22. * 连接工厂.
    23. */
    24. @Bean
    25. public MqttPahoClientFactory mqttClientFactory() {
    26. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    27. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    28. mqttConnectOptions.setUserName("test1");
    29. mqttConnectOptions.setPassword("123456".toCharArray());
    30. mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"});
    31. mqttConnectOptions.setKeepAliveInterval(2);
    32. mqttConnectOptions.setAutomaticReconnect(true);
    33. factory.setConnectionOptions(mqttConnectOptions);
    34. return factory;
    35. }
    36. private String createClientId() {
    37. return UUID.randomUUID().toString();
    38. }
    39. /**
    40. * 配置client,发布.
    41. */
    42. @Bean
    43. @ServiceActivator(inputChannel = "mqttOutboundChannel")
    44. public MessageHandler mqttOutbound() {
    45. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
    46. createClientId(), mqttClientFactory());
    47. messageHandler.setAsync(true);
    48. messageHandler.setDefaultQos(2);
    49. messageHandler.setDefaultRetained(false); //不保留消息
    50. return messageHandler;
    51. }
    52. @Bean
    53. public MessageChannel mqttOutboundChannel() {
    54. return new DirectChannel();
    55. }
    56. //接收通道
    57. @Bean
    58. public MessageChannel mqttInputChannel() {
    59. return new DirectChannel();
    60. }
    61. /**
    62. * 配置client,监听的topic.
    63. */
    64. @Bean
    65. public MessageProducer inbound() {
    66. String[] topics = {"first_topic"};
    67. MqttPahoMessageDrivenChannelAdapter adapter =
    68. new MqttPahoMessageDrivenChannelAdapter(createClientId(),
    69. mqttClientFactory(), topics);
    70. adapter.setCompletionTimeout(3_000);
    71. adapter.setConverter(new DefaultPahoMessageConverter());
    72. adapter.setQos(2);
    73. adapter.setOutputChannel(mqttInputChannel());
    74. return adapter;
    75. }
    76. /**
    77. * 消息处理器
    78. */
    79. @Bean
    80. @ServiceActivator(inputChannel = "mqttInputChannel")
    81. public MessageHandler handler() {
    82. return (message -> {
    83. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
    84. String payload = message.getPayload().toString();
    85. System.out.println("消息主题:" + topic);
    86. System.out.println("消息内容:" + payload);
    87. });
    88. }
    89. }

    配置消息通道MessageChannel和消息处理器,消息通道有两个,一个用来发送消息,一个用来接收消息。

    配置消息网关

    1. import org.springframework.integration.annotation.MessagingGateway;
    2. import org.springframework.integration.mqtt.support.MqttHeaders;
    3. import org.springframework.messaging.handler.annotation.Header;
    4. import org.springframework.stereotype.Component;
    5. @Component
    6. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    7. public interface MqttGateway {
    8. void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
    9. }

    加上@Component注解是为了IDE不报错,不加也不影响功能。

    客户端使用

    1. import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    2. public class TestSpringMqtt {
    3. public static void main(String[] args) {
    4. // 创建容器上下文
    5. AnnotationConfigApplicationContext context =
    6. new AnnotationConfigApplicationContext(MqttClientConfig.class);
    7. MqttGateway mqttGateway = (MqttGateway) context.getBean("mqttGateway");
    8. // 获取消息网关Bean对象并发送消息
    9. mqttGateway.sendToMqtt("hello", "first_topic");
    10. }
    11. }

    原理分析

    @IntegrationComponentScan注解

    先来分析@IntegrationComponentScan注解,它会导入IntegrationComponentScanRegistrar配置类。

    1. public class IntegrationComponentScanRegistrar implements ImportBeanDefinitionRegistrar,
    2. ResourceLoaderAware, EnvironmentAware {
    3. private final Map componentRegistrars =
    4. new HashMap();
    5. // 过滤类型为@MessagingGateway注解,会交给MessagingGatewayRegistrar来处理
    6. public IntegrationComponentScanRegistrar() {
    7. this.componentRegistrars.put(new AnnotationTypeFilter(MessagingGateway.class, true),
    8. new MessagingGatewayRegistrar());
    9. }
    10. @Override
    11. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    12. Map componentScan =
    13. importingClassMetadata.getAnnotationAttributes(IntegrationComponentScan.class.getName());
    14. Collection basePackages = getBasePackages(importingClassMetadata, registry);
    15. if (basePackages.isEmpty()) {
    16. basePackages = Collections.singleton(ClassUtils.getPackageName(importingClassMetadata.getClassName()));
    17. }
    18. ClassPathScanningCandidateComponentProvider scanner =
    19. new ClassPathScanningCandidateComponentProvider(false, this.environment) {
    20. @Override
    21. protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
    22. return beanDefinition.getMetadata().isIndependent()
    23. && !beanDefinition.getMetadata().isAnnotation();
    24. }
    25. };
    26. filter(registry, componentScan, scanner); // NOSONAR - never null
    27. scanner.setResourceLoader(this.resourceLoader);
    28. // 开始扫描
    29. for (String basePackage : basePackages) {
    30. Set candidateComponents = scanner.findCandidateComponents(basePackage);
    31. for (BeanDefinition candidateComponent : candidateComponents) {
    32. if (candidateComponent instanceof AnnotatedBeanDefinition) {
    33. for (ImportBeanDefinitionRegistrar registrar : this.componentRegistrars.values()) {
    34. // 交给具体的注册器来处理,这里就是MessagingGatewayRegistrar
    35. registrar.registerBeanDefinitions(((AnnotatedBeanDefinition) candidateComponent).getMetadata(),
    36. registry);
    37. }
    38. }
    39. }
    40. }
    41. }
    42. }

    此注解作用类似于@ComponentScan注解,会扫描包含@MessagingGateway注解的Class,并注册到容器中。继续分析MessagingGatewayRegistrar

    1. public class MessagingGatewayRegistrar implements ImportBeanDefinitionRegistrar {
    2. @Override
    3. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    4. if (importingClassMetadata != null && importingClassMetadata.isAnnotated(MessagingGateway.class.getName())) {
    5. Map annotationAttributes =
    6. importingClassMetadata.getAnnotationAttributes(MessagingGateway.class.getName());
    7. BeanDefinitionReaderUtils.registerBeanDefinition(this.parse(annotationAttributes), registry);
    8. }
    9. }
    10. public BeanDefinitionHolder parse(Map gatewayAttributes) {
    11. // 创建具体的BeanDefinition
    12. String defaultPayloadExpression = (String) gatewayAttributes.get("defaultPayloadExpression");
    13. // 核心,接口的具体实现类为GatewayProxyFactoryBean
    14. BeanDefinitionBuilder gatewayProxyBuilder =
    15. BeanDefinitionBuilder.genericBeanDefinition(GatewayProxyFactoryBean.class);
    16. AbstractBeanDefinition beanDefinition = gatewayProxyBuilder.getBeanDefinition();
    17. beanDefinition.addMetadataAttribute(new BeanMetadataAttribute(IntegrationConfigUtils.FACTORY_BEAN_OBJECT_TYPE,
    18. serviceInterface));
    19. return new BeanDefinitionHolder(beanDefinition, id);
    20. }
    21. }

    使用GatewayProxyFactoryBean类来作为接口的具体实现。

    1. public class GatewayProxyFactoryBean extends AbstractEndpoint
    2. implements TrackableComponent, FactoryBean, MethodInterceptor, BeanClassLoaderAware {
    3. protected void onInit() {
    4. synchronized (this.initializationMonitor) {
    5. // 创建动态代理对象,这里的serviceInterface就是我们定义的MqttGateway,
    6. // 拦截器就是GatewayProxyFactoryBean自身
    7. this.serviceProxy = new ProxyFactory(this.serviceInterface, this).getProxy(this.beanClassLoader);
    8. }
    9. }
    10. @Override
    11. public Object getObject() {
    12. if (this.serviceProxy == null) {
    13. this.onInit();
    14. }
    15. // 返回创建的代理对象
    16. return this.serviceProxy;
    17. }
    18. }
    19. GatewayProxyFactoryBean使用动态代理(底层是Cglib或JDK)创建一个代理对象,拦截器就是自身,所以当我们调用MqttGateway的sendToMqtt()方法时,
      就会被拦截到GatewayProxyFactoryBean的invoke()方法。

      发送Mqtt消息流程分析

      从GatewayProxyFactoryBean类的invoke()方法开始

      1. @Override
      2. @Nullable
      3. public Object invoke(final MethodInvocation invocation) throws Throwable { // NOSONAR
      4. // 继续执行
      5. return doInvoke(invocation, true);
      6. }
      7. @Nullable
      8. private Object invokeGatewayMethod(MethodInvocation invocation, boolean runningOnCallerThread) {
      9. Method method = invocation.getMethod();
      10. // 根据method找到对应的MethodInvocationGateway
      11. MethodInvocationGateway gateway = this.gatewayMap.get(method);
      12. // 发送消息并接收响应
      13. response = sendOrSendAndReceive(invocation, gateway, shouldReturnMessage, shouldReply);
      14. // 处理返回值类型
      15. return response(gateway.returnType, shouldReturnMessage, response);
      16. }

      具体发送的处理会交给MethodInvocationGateway来处理,进入它的父类MessagingGatewaySupport的send()方法

      1. protected void send(Object object) {
      2. // 根据@MessagingGateway注解中配置的defaultRequestChannel属性值从容器中获取到消息通道Bean对象
      3. MessageChannel channel = getRequestChannel();
      4. try {
      5. this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
      6. }
      7. }

      这个消息通道对象就是我们在MqttClientConfig配置类中声明的如下Bean,具体类型为DirectChannel。

      1. @Bean
      2. public MessageChannel mqttOutboundChannel() {
      3. return new DirectChannel();
      4. }

      我们继续分析messagingTemplate的convertAndSend()方法,最终进入GenericMessagingTemplate的doSend()方法

      1. protected final void doSend(MessageChannel channel, Message message, long timeout) {
      2. Message messageToSend = message;
      3. // 委托给channel对象来发送消息,这里的channel具体类型为上面定义的DirectChannel
      4. boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
      5. }

      进入DirectChannel的父类AbstractMessageChannel的send()方法

      1. @Override // NOSONAR complexity
      2. public boolean send(Message messageArg, long timeout) {
      3. Message message = messageArg;
      4. try {
      5. // 消息类型转换
      6. message = convertPayloadIfNecessary(message);
      7. // 实际发送消息
      8. sent = doSend(message, timeout);
      9. return sent;
      10. }
      11. }
      12. @Override
      13. protected boolean doSend(Message message, long timeout) {
      14. try {
      15. //这里的MessageDispatcher(消息分发器)类型为UnicastingDispatcher
      16. return getRequiredDispatcher().dispatch(message);
      17. }
      18. }

      DirectChannel中定义的消息分发器类型为UnicastingDispatcher,这是一个单播分发器,表示这条消息只会由一个消息处理器来处理,区别于广播分发器BroadcastingDispatcher。

      1. private boolean doDispatch(Message message) {
      2. // 先尝试优化处理,如果只有一个消息处理器的话,直接交给它处理
      3. if (tryOptimizedDispatch(message)) {
      4. return true;
      5. }
      6. boolean success = false;
      7. // 有多个消息处理器的情况
      8. Iterator handlerIterator = getHandlerIterator(message);
      9. List exceptions = null;
      10. // 有一个处理成功就退出循环
      11. while (!success && handlerIterator.hasNext()) {
      12. MessageHandler handler = handlerIterator.next();
      13. try {
      14. // 消息处理器处理消息
      15. handler.handleMessage(message);
      16. success = true; // we have a winner.
      17. }
      18. }
      19. return success;
      20. }

      其实这里的消息处理器就是我们在MqttClientConfig配置类中定义的如下Bean

      1. /**
      2. * 配置client,发布.
      3. */
      4. @Bean
      5. @ServiceActivator(inputChannel = "mqttOutboundChannel")
      6. public MessageHandler mqttOutbound() {
      7. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
      8. createClientId(), mqttClientFactory());
      9. messageHandler.setAsync(true);
      10. messageHandler.setDefaultQos(2);
      11. messageHandler.setDefaultRetained(false); //不保留消息
      12. return messageHandler;
      13. }

      继续分析MqttPahoMessageHandler是如何处理消息的

      1. public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
      2. implements MqttCallback, ApplicationEventPublisherAware {
      3. @Override
      4. protected void handleMessageInternal(Message message) throws Exception {
      5. Object mqttMessage = this.converter.fromMessage(message, Object.class);
      6. String topic = this.topicProcessor.processMessage(message);
      7. // 发布消息
      8. publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
      9. }
      10. @Override
      11. protected void publish(String topic, Object mqttMessage, Message message) throws Exception {
      12. // 根据MqttPahoClientFactory创建MqttAsyncClient对象,向Mqtt服务器发送Mqtt消息
      13. IMqttDeliveryToken token = checkConnection()
      14. .publish(topic, (MqttMessage) mqttMessage);
      15. }
      16. }

      至此Mqtt消息发送的流程就结束了,总结一下

      1. 定义MqttGateway接口,包含@MessagingGateway注解
      2. @IntegrationComponentScan会引入IntegrationComponentScanRegistrar配置类,扫描包含@MessagingGateway注解的Class,
        使用GatewayProxyFactoryBean来实现此接口。
      3. GatewayProxyFactoryBean创建动态代理对象,拦截发送Mqtt消息的处理,委托给对应的MessageChannel(消息通道),
        此消息通道是通过@MessagingGateway注解的defaultRequestChannel属性来配置的。
      4. 消息通道对象委托给消息分发器来处理,具体为UnicastingDispatcher(单播分发器)
      5. 消息分发器会查找到多个对应的消息处理器(MessageHandler),交给它们处理

      但Spring是什么时候向消息分发器中添加消息处理器的呢,这个就涉及到@ServiceActivator注解的功能了。而@ServiceActivator注解功能的开启需要@EnableIntegration注解,
      所以接下来继续分析@EnableIntegration注解。

      @EnableIntegration注解

      从注解的名称可以简单看出来,开启集成功能,它会引入IntegrationRegistrar配置类。

      1. public class IntegrationRegistrar implements ImportBeanDefinitionRegistrar {
      2. @Override
      3. public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
      4. BeanDefinitionRegistry registry) {
      5. registerImplicitChannelCreator(registry);
      6. registerDefaultConfiguringBeanFactoryPostProcessor(registry);
      7. registerIntegrationConfigurationBeanFactoryPostProcessor(registry);
      8. if (importingClassMetadata != null) {
      9. // 向容器中注册MessagingAnnotationPostProcessor
      10. registerMessagingAnnotationPostProcessors(importingClassMetadata, registry);
      11. }
      12. }
      13. }

      此配置类会向容器中注册很多Bean,我们只关注MessagingAnnotationPostProcessor,这是一个BeanPostProcessor

      1. public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean,
      2. SmartInitializingSingleton {
      3. @Override
      4. public void afterPropertiesSet() {
      5. ((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(
      6. IntegrationContextUtils.DISPOSABLES_BEAN_NAME,
      7. BeanDefinitionBuilder.genericBeanDefinition(Disposables.class, Disposables::new)
      8. .getRawBeanDefinition());
      9. this.postProcessors.put(Filter.class, new FilterAnnotationPostProcessor(this.beanFactory));
      10. this.postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory));
      11. this.postProcessors.put(Transformer.class, new TransformerAnnotationPostProcessor(this.beanFactory));
      12. this.postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.beanFactory));
      13. this.postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.beanFactory));
      14. this.postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory));
      15. this.postProcessors.put(InboundChannelAdapter.class,
      16. new InboundChannelAdapterAnnotationPostProcessor(this.beanFactory));
      17. this.postProcessors.put(BridgeFrom.class, new BridgeFromAnnotationPostProcessor(this.beanFactory));
      18. this.postProcessors.put(BridgeTo.class, new BridgeToAnnotationPostProcessor(this.beanFactory));
      19. Mapextends Annotation>, MethodAnnotationPostProcessor> customPostProcessors =
      20. setupCustomPostProcessors();
      21. if (!CollectionUtils.isEmpty(customPostProcessors)) {
      22. this.postProcessors.putAll(customPostProcessors);
      23. }
      24. }
      25. }

      可以看到,它会处理很多注解类型的解析,包括我们用到的@ServiceActivator注解。

      1. @Override
      2. public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
      3. Assert.notNull(this.beanFactory, "BeanFactory must not be null");
      4. Class beanClass = AopUtils.getTargetClass(bean);
      5. // 遍历每个方法,解析上述注解
      6. ReflectionUtils.doWithMethods(beanClass,
      7. method -> doWithMethod(method, bean, beanName, beanClass),
      8. ReflectionUtils.USER_DECLARED_METHODS);
      9. return bean;
      10. }

      此方法会在Bean初始化之后调用,解析上述的所有注解

      1. protected void processAnnotationTypeOnMethod(Object bean, String beanName, Method method,
      2. Class annotationType, List annotations) {
      3. // 根据对应的注解类型找到对应的注解处理器,
      4. // 以@ServiceActivator注解为例,得到的postProcessor类型就是ServiceActivatorAnnotationPostProcessor
      5. MethodAnnotationPostProcessor postProcessor =
      6. MessagingAnnotationPostProcessor.this.postProcessors.get(annotationType);
      7. if (postProcessor != null && postProcessor.shouldCreateEndpoint(method, annotations)) {
      8. Method targetMethod = method;
      9. if (this.initialized) {
      10. // 处理此Method并注册Endpoint
      11. postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
      12. postProcessor, targetMethod);
      13. }
      14. }
      15. }

      继续跟进去

      1. private void postProcessMethodAndRegisterEndpointIfAny(Object bean, String beanName, Method method,
      2. Class annotationType, List annotations,
      3. MethodAnnotationPostProcessor postProcessor, Method targetMethod) {
      4. // 以@ServiceActivator注解为例,处理之后得到result为EventDrivenConsumer类型,是AbstractEndpoint的子类
      5. Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
      6. if (result instanceof AbstractEndpoint) {
      7. // 将这个创建的endpoint对象实例注册到Bean容器中并初始化
      8. AbstractEndpoint endpoint = (AbstractEndpoint) result;
      9. String endpointBeanName = generateBeanName(beanName, method, annotationType);
      10. endpoint.setBeanName(endpointBeanName);
      11. // 作为单例注册到容器中
      12. getBeanFactory().registerSingleton(endpointBeanName, endpoint);
      13. // Bean初始化
      14. getBeanFactory().initializeBean(endpoint, endpointBeanName);
      15. }
      16. }

      接下来再看一下EventDrivenConsumer的作用

      1. public class EventDrivenConsumer extends AbstractEndpoint implements IntegrationConsumer {
      2. @Override
      3. protected void doStart() {
      4. // 将消息处理器添加到消息通道中,具体来说就是添加到消息分发器中
      5. this.inputChannel.subscribe(this.handler);
      6. if (this.handler instanceof Lifecycle) {
      7. ((Lifecycle) this.handler).start();
      8. }
      9. }
      10. }

      简单总结一下

      1. @EnableIntegration注解开启对@ServiceActivator注解的处理。
      2. @ServiceActivator注解将此消息处理器添加到配置的消息通道中,每个消息通道都包含一个消息分发器,实际上是添加到消息分发器中。
        消息通道是通过@ServiceActivator注解的inputChannel属性来配置的。

      接收Mqtt消息流程分析

      接收消息是由我们配置类中定义的MqttPahoMessageDrivenChannelAdapter来处理的

      1. /**
      2. * 配置client,监听的topic.
      3. */
      4. @Bean
      5. public MessageProducer inbound() {
      6. String[] topics = {"first_topic"};
      7. MqttPahoMessageDrivenChannelAdapter adapter =
      8. new MqttPahoMessageDrivenChannelAdapter(createClientId(),
      9. mqttClientFactory(), topics);
      10. adapter.setCompletionTimeout(3_000);
      11. adapter.setConverter(new DefaultPahoMessageConverter());
      12. adapter.setQos(2);
      13. adapter.setOutputChannel(mqttInputChannel());
      14. return adapter;
      15. }

      它是一个MqttCallback,重写了messageArrived()方法,如果不了解MqttCallback可以查看Spring整合Mqtt简单使用

      1. @Override
      2. public void messageArrived(String topic, MqttMessage mqttMessage) {
      3. Message message = this.getConverter().toMessage(topic, mqttMessage);
      4. try {
      5. // 发送消息
      6. sendMessage(message);
      7. }
      8. }
      9. protected void sendMessage(Message messageArg) {
      10. Message message = messageArg;
      11. MessageChannel messageChannel = getOutputChannel();
      12. // 会委托给MessageChannel来发送
      13. this.messagingTemplate.send(messageChannel, message);
      14. }

      MessageChannel(消息通道)委托给自身的消息分发器来处理,找到对应的消息处理器

      1. /**
      2. * 消息处理器
      3. */
      4. @Bean
      5. @ServiceActivator(inputChannel = "mqttInputChannel")
      6. public MessageHandler handler() {
      7. return (message -> {
      8. String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
      9. String payload = message.getPayload().toString();
      10. System.out.println("消息主题:" + topic);
      11. System.out.println("消息内容:" + payload);
      12. });
      13. }

      至此接收Mqtt消息的的流程就结束了。

      总结

      Spring整合Mqtt协议使用到了spring-integration框架,它内部又依赖spring-messaging框架。Spring中消息处理一共有3种角色(实际上不止3种,这里我们仅用到了3种)

      1. 消息生产者(MessageProducer),我们配置的MqttPahoMessageDrivenChannelAdapter和MqttGateway接口都是这种角色,内部需要关联一个消息通道。
      2. 消息处理器(MessageHandler),我们配置的MqttPahoMessageHandler和lambda表达式实现都是这种角色。
      3. 消息通道(MessageChannel),内部包含一个消息分发器,消息分发器中包含多个消息处理器。

      通过@ServiceActivator注解将具体的消息处理器添加到消息通道中。消息生产者生成一个消息,通过内部的消息通道将消息分发给对应的消息处理器。
      注意,这里所说的消息为Spring-messaging框架中的通用消息,和Mqtt消息没有关系。

      Mqtt消息发送的流程为:

      • MqttGateway生成一个Spring消息,通过消息通道找到对应的消息处理器MqttPahoMessageHandler,它会将Mqtt消息发送给Mqtt服务器。

      Mqtt消息接收的流程为:

      • MqttPahoMessageDrivenChannelAdapter是一个Mqtt消息的监听器,当接收到一个Mqtt消息后,生成一个Spring消息,通过消息通道找到对应的消息处理器,
        就是我们自己定义的MqttClientConfig配置类中的handler()方法。

      参考

      Spring Integration 中文手册(完整版)

    20. 相关阅读:
      yolov5-tracking-xxxsort yolov5融合六种跟踪算法(三)--目标跟踪
      改进的Salp Swarm优化算法(ISSA)(Matlab代码实现)
      nodejs+vue+elementui医院挂号预约管理系统4n9w0
      redis 持久化原理
      Kubernetes包管理工具-Helm3使用
      Java面向对象进阶2——继承
      2020年大厂Java面试前复习的正确姿势(800+面试题答案解析)
      [SpringMVC笔记] SpringMVC-06-JSON数据传递参数
      stm32flymcu烧写几次后就超时无应答,如何解决?
      spring ioc
    21. 原文地址:https://blog.csdn.net/xrq1995/article/details/127690595