简单模式
很简单:生产者、队列和消费者。生产者向队列发送消息,消费者监听队列并消费消息
工作模式
工作:一个生产者,一个队列和多个消费者。生产者向队列发送消息,多个消费者监听同一个队列的消费消息
发布/订阅模式
发布/订阅:发布/订阅模式包括一个生产者、一个交换机、多个队列和多个消费者。交换机(Exchange)直接绑定到队列。生产者通过交换机(Exchange)将消息存储在绑定到交换机的队列中,消费者监听队列并消费
路由模式
路由:路由模式可以根据路由键将消息发送到指定队列。Exchange 和队列通过路由键绑定。生产者通过 Exchange 和路由键将消息准确地发送到队列。消费者监听队列并消费消息
主题模式
主题:主题模式支持在路由模式的基础上进行通配符操作。交换机会根据通配符将消息存入匹配队列,消费者监听队列并消费
标头模式
Header:header模式取消了路由key,而是使用header中的key/value对来匹配。匹配成功后,会通过交换机将消息发送到队列中,由消息制造者获取和消费
RPC 模式
RPC:RPC方式主要用于获取消费者的处理结果。通常,生产者将消息发送给消费者。消费者收到消息并消费后,将处理结果返回给生产者
首先,搭建SpringBoot项目,在POM XML文件中添加如下依赖
<依赖>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</依赖>
<依赖>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</依赖>
修改配置文件,添加如下RabbitMQ配置
服务器:
port: 8888 # 设置端口号
Spring:
rabbitMQ:
host: 127.0.0.1 # 设置 RabbitMQ 的主机
port: 5672 # 设置 RabbitMQ 服务端口
username: guest # 设置 RabbitMQ 用户名
password: guest # 设置 RabbitMQ 密码
新的公共常量类
public interface RabbitConstant {
/**
* 简单模式
*/
String SIMPLE_QUEUE_NAME = "simple_queue";
/**
* 工作模式
*/
String WORK_QUEUE_NAME = "work_queue";
/**
* 发布/订阅模式
*/
String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange";
字符串 PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue";
字符串 PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue";
/**
* 路由模式
*/
String ROUTING_EXCHANGE_NAME = "routing_exchange";
字符串 ROUTING_FIRST_QUEUE_NAME = "routing_first_queue";
字符串 ROUTING_SECOND_QUEUE_NAME = "routing_second_queue";
字符串 ROUTING_THIRD_QUEUE_NAME = "routing_third_queue";
字符串 ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key";
字符串 ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key";
字符串 ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key";
/**
* 主题模式
*/
String TOPICS_EXCHANGE_NAME = "topics_exchange";
字符串 TOPICS_FIRST_QUEUE_NAME = "topics_first_queue";
字符串 TOPICS_SECOND_QUEUE_NAME = "
字符串 TOPICS_THIRD_QUEUE_NAME = "topics_third_queue";
String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key";
String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key";
String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key";
字符串 TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#";
字符串 TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#";
字符串 TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*";
/**
* 标题模式
*/
String HEADER_EXCHANGE_NAME = "header_exchange";
字符串 HEADER_FIRST_QUEUE_NAME = "header_first_queue";
字符串 HEADER_SECOND_QUEUE_NAME = "header_second_queue";
/**
* rpc 模式
*/
String RPC_QUEUE_NAME = "rpc_queue";
}
添加一个Controller请求类(用于验证结果,最后可以添加)
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Message;
导入 org.springframework.amqp.core.MessageProperties;
导入 org.springframework.amqp.rabbit.core.RabbitTemplate;
导入 org.springframework.beans.factory.annotation.Autowired;
导入 org.springframework.web.bind.annotation.GetMapping;
导入 org.springframework.web.bind.annotation.RestController;
导入 java.nio.charset.StandardCharsets;
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/simple")
public void simple() {
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "你好世界!");
}
@GetMapping(value = "/work")
public void work() {
rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
}
@GetMapping(value = "/pubsub")
public void pubsub() {
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "发布/订阅你好");
}
@GetMapping(value = "/routing")
public void routing() {
// 向第一个队列发送消息
rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "路由你好");
}
@GetMapping(value = "/topics")
public void topics() {
// 向第一个队列发送消息。这时候队列可以接收到消息,因为队列的通配符是#first.#,而routing_key是topics first。路由。键,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
// 向第二个队列发送消息。这时候队列也能收到消息了,因为队列的通配符是*秒#,而routing_key是topic秒。路由。键,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
// 向第三个队列发送消息。此时队列无法接受消息,因为队列通配符是*第三个*,而routing_key是topics第三个。路由。键,匹配失败
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
}
@GetMapping(value = "/header")
public void header() {
// 这个消息应该被两个队列接收。第一个队列全部匹配成功,第二个队列 Hello 值任意匹配成功
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("matchAll", "YES");
messageProperties.setHeader("你好", "world");
Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);
// 这个消息应该只被第二个队列接受。第一个队列全部匹配失败,
MessageProperties messagePropertiesSecond = new MessageProperties();
messagePropertiesSecond.setHeader("matchAll", "NO");
Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
}
@GetMapping(value = "/rpc")
public void rpc() {
Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
System.out.println("rabbit rpc 响应消息:" + responseMsg);
}
}
生产者声明队列并向队列发送消息
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitSimpleProvider {
@Bean
public Queue simpleQueue() {
return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME);
}
}
消费者监听队列并消费消息
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component
public class RabbitSimpleConsumer {
@RabbitHandler
@RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME)
public void simpleListener(String context) {
System.out.println("rabbit receiver: " + context);
}
}
单元测试
@Test
public void simple() {
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
}
响应结果
生产者声明队列并向队列生成消息
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
公共类 RabbitWorkProvider {
@Bean
公共队列 workQueue() {
return new Queue(RabbitConstant.WORK_QUEUE_NAME);
}
}
消费者监听队列并消费消息(有两个消费者监听同一个队列)
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component
public class RabbitWorkConsumer {
@RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
@RabbitHandler
public void workQueueListenerFirst(String context) {
System.out.println("rabbit workQueue listener first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
@RabbitHandler
public void workQueueListenerSecond(String context) {
System.out.println("rabbit workQueue listener 第二个接收者:" + context);
}
}
单元测试
@Test
public void work() {
rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "工作你好!");
}
响应结果(由于有两个消费者监听同一个队列,所以消息只能被其中一个消费者消费。默认情况下,消息是负载均衡发送给所有消费者的)
生产者声明两个队列和一个扇出交换机,并将两个队列绑定到交换机
开关有四种类型:fanout、direct、topic和header(文末有介绍)
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.*;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
公共类 RabbitPublishSubscribeProvider {
@Bean
公共队列 pubsubQueueFirst() {
return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME);
}
@Bean
公共队列 pubsubQueueSecond() {
return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME);
}
@Bean
公共 FanoutExchange fanoutExchange() {
// 创建扇出类型开关,表示交换机会向所有绑定队列发送消息
return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME);
}
@Bean
public Binding pubsubQueueFirstBindFanoutExchange() {
// 队列绑定开关
return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange());
}
@Bean
public Binding pubsubQueueSecondBindFanoutExchange() {
// 队列二绑定开关
return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange());
}
}
消费者监听队列并消费
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component
public class RabbitPublishSubscribeConsumer {
@RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME)
@RabbitHandler
public void pubsubQueueFirst(String context) {
System.out.println("rabbit pubsub queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME)
@RabbitHandler
public void pubsubQueueSecond(String context) {
System.out.println("rabbit pubsub 队列第二个接收者:" + context);
}
}
单元测试
@Test
public void pubsub() {
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "发布/订阅你好");
}
响应结果
生产者声明三个队列和一个直接交换机,将三个队列绑定到交换机,并设置交换机和队列之间的路由
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRoutingProvider {
@Bean
public Queue rabbitRoutingFirstQueue() {
return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
}
@Bean
public Queue rabbitRoutingSecondQueue() {
return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
}
@Bean
public Queue rabbitRoutingThirdQueue() {
return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange() {
// Create a direct switch, indicating that the exchange opportunity sends messages to routing_ Queue with identical key
return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
}
@Bean
public Binding routingFirstQueueBindDirectExchange() {
// Bind the direct switch to the queue and set the routing_key is routing_first_queue_routing_key
return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
}
@Bean
public Binding routingSecondQueueBindDirectExchange() {
// Queue 2 binds the direct switch and sets routing_key is routing_second_queue_routing_key
return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
}
@Bean
public Binding routingThirdQueueBindDirectExchange() {
// The queue 3 is bound to the direct switch and the routing is set_ Key is routing_third_queue_routing_key
return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME);
}
}
消费者监听队列并消费
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component
public class RabbitRoutingConsumer {
@RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME)
@RabbitHandler
public void routingFirstQueueListener(String context) {
System.out.println("rabbit routing queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME)
@RabbitHandler
public void routingSecondQueueListener(String context) {
System.out.println("rabbit pubsub 队列第二个接收者:" + context);
}
@RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME)
@RabbitHandler
public void routingThirdQueueListener(String context) {
System.out.println("rabbit pubsub 队列第三个接收者:" + context);
}
}
单元测试
@Test
public void routing() {
// 向第一个队列发送消息
rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
}
响应结果
生产者声明了三个队列和一个主题切换。队列分别与主题交换机绑定,并设置了路由键统一字符。如果路由键满足交换机和队列之间的通配符要求,则将消息存储在队列中
#通配符可以匹配一个或多个单词,*通配符可以匹配一个单词;如果Exchange和队列之间的routing key通配符是#hello.#,则表示中间所有带hello的routing key都满足条件,消息会被存入队列
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.core.Binding;
导入 org.springframework.amqp.core.BindingBuilder;
导入 org.springframework.amqp.core.Queue;
导入 org.springframework.amqp.core.TopicExchange;
导入 org.springframework.context.annotation.Bean;
导入 org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicProvider {
@Bean
public Queue topicFirstQueue() {
return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME);
}
@Bean
公共队列 topicSecondQueue() {
return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
}
@Bean
公共队列 topicThirdQueue() {
return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange() {
// 创建一个主题类型切换,表示交换机会发送消息到 routing_key 通配符匹配队列成功
return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME);
}
@Bean
public Binding topicFirstQueueBindExchange() {
// 绑定topic类型切换到队列1并设置routing_key通配符#first.#
return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD);
}
@Bean
public Binding topicSecondQueueBindExchange() {
//第二个队列绑定主题类型切换,设置路由_key通配符为* second.#
return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()) .with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD);
}
@Bean
public Binding topicThirdQueueBindExchange() {
// 三个队列绑定主题切换,设置routing_key通配符为*third.*
return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD);
}
}
消费者监听队列并消费
导入 com.example.rabbitmq.constant.RabbitConstant;
导入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
导入 org.springframework.amqp.rabbit.annotation.RabbitListener;
导入 org.springframework.stereotype.Component;
@Component
public class RabbitTopicsConsumer {
@RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME)
@RabbitHandler
public void topicFirstQueue(String context) {
System.out.println("rabbit topics queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME)
@RabbitHandler
public void topicSecondQueue(String context) {
System.out.println("兔子主题队列第二个接收者:" + context);
}
@RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME)
@RabbitHandler
public void topicThirdQueue(String context) {
System.out.println("rabbit 主题队列第三个接收者:" + context);
}
}
单元测试
@Test
public void topics() {
// 向第一个队列发送消息。这时候队列可以接收到消息,因为队列的通配符是#first.#,而routing_key是topics first。路由。键,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
// 向第二个队列发送消息。这时候队列也能收到消息了,因为队列的通配符是*秒#,而routing_key是topic秒。路由。键,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
// 向第三个队列发送消息。此时队列无法接受消息,因为队列通配符是*第三个*,而routing_key是topics第三个。路由。键,匹配失败
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
}
响应结果
以上就是关于“Rabbitmq的几种模式介绍”,如果大家想了解更多相关知识,可以关注一下极悦的RabbitMQ教程,里面的课程内容细致全面,通俗易懂,适合小白学习,希望对大家能够有所帮助。
你适合学Java吗?4大专业测评方法
代码逻辑 吸收能力 技术学习能力 综合素质
先测评确定适合在学习