主题(配置的模糊匹配,为了并发):VirtualTopic/device/sendData/12312
- package com
-
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.core.util.NumberUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSONUtil;
-
- import org.apache.activemq.ActiveMQMessageAudit;
- import org.apache.activemq.command.ActiveMQBytesMessage;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
-
- import javax.jms.Message;
- import java.nio.charset.Charset;
- import java.util.Date;
-
- @Component
- public class MessageHandler {
-
- @JmsListener(destination = "deviceQueue.receiveDate", containerFactory = "queueListener", concurrency = "1-3")
- public void deviceMessage(ActiveMQBytesMessage message) {
- System.out.println(StrUtil.str(message.getContent().getData(), Charset.defaultCharset()));
- System.out.println("###################" + message + "###################");
- }
- }
控制台打印
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <!-- START SNIPPET: example -->
- <beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- Allows us to use system properties as variables in this configuration file -->
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="locations">
- <value>file:${activemq.conf}/credentials.properties</value>
- </property>
- </bean>
-
- <!--
- The <broker> element is used to configure the ActiveMQ broker.
- -->
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
-
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic=">" >
- <!-- The constantPendingMessageLimitStrategy is used to prevent
- slow topic consumers to block producers and affect other consumers
- by limiting the number of messages that are retained
- For more information, see:
-
- http://activemq.apache.org/slow-consumer-handling.html
-
- -->
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="1000"/>
- </pendingMessageLimitStrategy>
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
-
-
- <!--
- The managementContext is used to configure how ActiveMQ is exposed in
- JMX. By default, ActiveMQ uses the MBean server that is started by
- the JVM. For more information, see:
-
- http://activemq.apache.org/jmx.html
- -->
- <managementContext>
- <managementContext createConnector="false"/>
- </managementContext>
-
- <!--
- Configure message persistence for the broker. The default persistence
- mechanism is the KahaDB store (identified by the kahaDB tag).
- For more information, see:
-
- http://activemq.apache.org/persistence.html
- -->
- <persistenceAdapter>
- <kahaDB directory="${activemq.data}/kahadb"/>
- </persistenceAdapter>
-
-
- <!--
- The systemUsage controls the maximum amount of space the broker will
- use before disabling caching and/or slowing down producers. For more information, see:
- http://activemq.apache.org/producer-flow-control.html
- -->
- <systemUsage>
- <systemUsage>
- <memoryUsage>
- <memoryUsage percentOfJvmHeap="70" />
- </memoryUsage>
- <storeUsage>
- <storeUsage limit="100 gb"/>
- </storeUsage>
- <tempUsage>
- <tempUsage limit="50 gb"/>
- </tempUsage>
- </systemUsage>
- </systemUsage>
-
- <!--
- The transport connectors expose ActiveMQ over a given protocol to
- clients and other brokers. For more information, see:
-
- http://activemq.apache.org/configuring-transports.html
- -->
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- </transportConnectors>
-
- <!-- destroy the spring context on shutdown to stop jetty -->
- <shutdownHooks>
- <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
- </shutdownHooks>
-
- <!-- 消息订阅 -->
- <destinationInterceptors>
- <virtualDestinationInterceptor>
- <virtualDestinations>
- <compositeTopic name="VirtualTopic.device.sendData.*">
- <forwardTo>
- <queue physicalName="deviceQueue.receiveDate" />
- </forwardTo>
- </compositeTopic>
- </virtualDestinations>
- </virtualDestinationInterceptor>
- </destinationInterceptors>
-
- <plugins>
- <simpleAuthenticationPlugin>
- <users>
- <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
- <!--<authenticationUser username="user" password="password" groups="users"/>
- <authenticationUser username="guest" password="password" groups="guests"/> -->
- </users>
- </simpleAuthenticationPlugin>
- </plugins>
- </broker>
-
- <!--
- Enable web consoles, REST and Ajax APIs and demos
- The web consoles requires by default login, you can disable this in the jetty.xml file
-
- Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
- -->
- <import resource="jetty.xml"/>
-
- </beans>
- <!-- END SNIPPET: example -->