ActiveMQ教程
同步接收
receive()方法接收消息叫同步接收一个线程在工作,接收到消息后,执行结束只能接收一次消息,如果想不间断地接收消息,写一个while true循环。
使用监听器接收消息,这种接收方式叫异步接收,两个线程在工作,一个负责接收消息,一个负责处理消息。
为了实现不间断的监听接收消息,在开发代码的时候,我们不应该关闭连接。
注意
在同一个consumer中,我们不能同时使用这2种接收方式;
比如在使用listener的情况下,当调用receive()方法将会获得一个Exception;
● 监听器监听指定目的地的消息
● 如果有消息,那么监听器回调onMessage方法,并将消息传递给该方法
● 在该方法中对消息进行处理
● 拷贝QueueReceiver类,新的类名字为QueueListenerReceiver
● 将对消息的处理放到监听器的onMessage方法中
messageConsumer = session.createConsumer(destination,messageSelector);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
//判断消息类型是否为文本消息
if(message instanceof TextMessage){
String text = ((TextMessage) message).getText();
System.out.println(text);
}else if(message instanceof ObjectMessage){
User user = (User) ((ObjectMessage) message).getObject();
System.out.println("接收的对象:"+user.getId() +"::"+ user.getName()+"::"+user.getAge());
}else if(message instanceof MapMessage){
System.out.println(((MapMessage) message).getString("school")
+"办学"+((MapMessage) message).getInt("age") +"年了");
}else if(message instanceof BytesMessage){
boolean flag = ((BytesMessage) message).readBoolean();
String s = ((BytesMessage) message).readUTF();
System.out.println(flag +":::"+s);
}else if(message instanceof StreamMessage){
Long lo = ((StreamMessage) message).readLong();
String s = ((StreamMessage) message).readString();
System.out.println(lo +":::"+s);
}
//手动确认消息,如果不确认,消息不会被成功消费
message.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
});