ActiveMQ与SpringBoot消息接收方式 - 极悦
JMS&ActiveMQ教程
基于JMS的消息传送
ActiveMQ与Spring集成
ActiveMQ与SpringBoot集成
ActiveMQ安全机制
ActiveMQ主从集群

ActiveMQ与SpringBoot消息接收方式

同步接收

jmsTemplate.receive()

一个线程在工作,没接收到就等待,接收到了就往下执行直到程序结束;

如果想循环不断地接收,那么就写个while true循环。

异步接收

使用监听器监听ActiveMQ目的地,当有消息的时候,回调onMessage方法对消息进行处理,ActiveMQ与SpringBoot集成异步接收消息有两种实现方式。

1、@JmsListener注解方式实现

A、 创建SpringBoot工程13-activemq-boot-receiver-async-01

B、 创建的工程的时候,勾选集成ActiveMQ,会在pom.xml中添加如下依赖

<!--SpringBoot集成ActiveMQ的起步依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

C、 在application.properties文件中配置ActiveMQ信息

#配置activemq的连接信息
spring.activemq.broker-url=tcp://192.168.235.128:61616
#目的地
spring.jms.template.default-destination=bootQueue
#默认是缓存了jms的session的,所以主程序发送完消息后,不会退出
# 改为false主程序才可以退出 从SpringBoot2.1.0以后新增的
spring.jms.cache.enabled=false

D、 在com.bjpowernode.activemq.service包下创建MessageService类接收消息

@Service
public class MessageService {
    /**
     * SpringBoot程序启动后,会扫描到当前service类
     * 监听器对象会被创建
     * 通过注解指定了该监听器对象监听的目的地
     * 如果目的地上有消息需要接收,会调用我们自定义的方法,并传递接收到的消息到方法中
     * 方法名可以任意
     */
    @JmsListener(destination="${spring.jms.template.default-destination}")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            try {
                String text = ((TextMessage) message).getText();
                System.out.println("SpringBoot异步接收到的消息为:" + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

E、 运行Application程序,发送一个消息,查看接收效果

2、@Configuration配置方式实现

这种方式其实就是将spring接收异步请求需要配置的xml文件信息,放到配置类中。

A、 创建SpringBoot工程13-activemq-boot-receiver-async-02

B、 创建的工程的时候,勾选集成ActiveMQ,会在pom.xml中添加如下依赖

<!--SpringBoot集成ActiveMQ的起步依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

C、 在application.properties文件中配置ActiveMQ信息

#配置activemq的连接信息
spring.activemq.broker-url=tcp://192.168.235.128:61616
#目的地
spring.jms.template.default-destination=bootQueue
#默认是缓存了jms的session的,所以主程序发送完消息后,不会退出
# 改为false主程序才可以退出 从SpringBoot2.1.0以后新增的
spring.jms.cache.enabled=false

D、 自定义监听器接收处理消息(可以从12-activemq-spring-receiver-async拷贝)

该监听器没有在配置文件中注册bean,所有需要加@Component让Spring容器扫描

E、 在com.bjpowernode.activemq.config包下创建ActiveMQConfig类,用于替换我们前面在applicationContex-jms中配置的信息

@Configuration//相当于applicationContext-jms.xml文件
public class ActiveMQConfig {
    /*
        <!-- 配置一个连接工厂 -->
         <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
         <property name="brokerURL" value="tcp://192.168.235.128:61616"/>
         </bean>
        因为SpringBoot已经自动配置好了ActiveMQ的连接工厂,所有我们在配置文件中就不需要对这个bean进行配置
        我们在使用的时候,直接注入即可
     */
    @Autowired
    private ActiveMQConnectionFactory connectionFactory;

    @Autowired //注入我们自定义的监听器
    private MyMessageListener myMessageListener;

    //通过@Value注解接收SpringBoot配置文件中的目的地配置
    @Value("${spring.jms.template.default-destination}")
    private String destination;

    /**
         <!-- 配置一个sping监听器的容器 -->
         <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
         <!--引用连接工厂-->
         <property name="connectionFactory" ref="connectionFactory"/>
         <!--指定监听的目的地-->
         <property name="destinationName" value="springQueue"/>
         <!--监听到消息后,会回调onMessage方法,我们在自定义的监听器中对onMessage方法进行重写,完成消息的接收-->
         <property name="messageListener" ref="myMessageListener" />

         <!--pubSubDomain=false表示点对点消息,true表示发布订阅消息,默认是false-->
         <property name="pubSubDomain" value="true"/>
         </bean>
     */
    @Bean //@Bean注解就相当于配置文件的bean标签
    public DefaultMessageListenerContainer defaultMessageListenerContainer(){
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setDestinationName(destination);
        listenerContainer.setMessageListener(myMessageListener);
        //目前我们默认就是点对点的消息,暂时不配置发布订阅
        return listenerContainer;
    }
}

 F、 运行Application程序,发送一个消息,查看接收效果

全部教程