更新时间:2022-05-10 10:18:20 来源:极悦 浏览2918次
当发生以下情况之一时,来自消息队列的可能是“死信”:
消息被拒绝并且重新排队设置为 false
消息的 TTL 过期
超出队列长度限制
为了通过示例进行演示,我选择了第一种情况,即消息被拒绝。生产者将PaymentOrders作为消息发送,这些消息将由消费者处理。当PaymentOrder付款人账户资金不足时,消息将被拒绝。
生产者是一个 Spring Boot 应用程序,它使用Spring AMQP库向PaymentOrderRabbitMQ 发送消息。
生产者的 API
生产者 API 的第一部分是定义交换器的名称、路由密钥、传入和死信队列。
public class Constants {
public static final String EXCHANGE_NAME = "payment-orders.exchange";
public static final String ROUTING_KEY_NAME = "payment-orders";
public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}
第二部分是定义消息格式。我们在此示例中使用 JSON。以下 JSON 文档显示了我们如何建模PaymentOrder
{
"from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
"to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
"amount":54.75
}
请注意,最好不要使用自定义序列化格式(如有效负载的 Java 序列化),因为这意味着您需要有一个基于 Java 的使用者。好的做法是将有效负载格式化为 JSON。每个平台和/或语言都可以解析 JSON。
生产者配置
我们需要配置 AMQP 基础设施。死信队列配置封装在传入队列声明中。
有一个死信交换direct(DLX) 的概念,它是类型topic或的正常交换fanout。如果在处理从队列中获取的消息期间发生故障,RabbitMQ 会检查是否为该队列配置了死信交换。如果通过x-dead-letter-exchange参数配置了一个,那么它将使用原始路由密钥将失败的消息路由到它。可以通过x-dead-letter-routing-key参数覆盖此路由键。
在此示例中,我们使用default exchange(no-name) 作为 the dead letter exchange,并使用死信队列名称作为新的路由键。这将起作用,因为任何队列都绑定到默认交换,绑定键等于队列名称。
@Configuration
public class AmqpConfig {
@Bean
DirectExchange exchange() {
return new DirectExchange(Constants.EXCHANGE_NAME);
}
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
.build();
}
@Bean
Binding binding() {
return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
用于队列和交换的构建器 API 非常方便,并且从 Spring AMQP 库的 1.6 版本开始可用。
在 RabbitMQ 管理控制台中,DLX和DLK标签指示在传入队列上设置了dead letter exchange和dead letter routing key参数。
生产者逻辑
生产者每 5 秒生成一次随机PaymentOrder消息,这些消息被发送到 RabbitMQ 进行进一步处理。SpringAmqpTemplate是自动配置的,它可以连接到我们的组件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定义了它将自动关联到 auto-configured AmqpTemplate。
@Component
public class Producer {
private AmqpTemplate amqpTemplate;
public Producer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
@Scheduled(fixedDelay = 1000L)
public void send() {
PaymentOrder paymentOrder = new PaymentOrder(
Iban.random().toFormattedString(),
Iban.random().toFormattedString(),
new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
}
}
对于这个简单的示例,消费者也是一个 Spring Boot 应用程序,但在实际应用程序中,消费者和生产者不必在同一平台/语言上。
消费者 API
消费者 API 的第一部分是指定它连接到哪个队列。
public class Constants {
public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}
第二部分是适应生产者定义的消息格式。请注意,在这种情况下,两个应用程序都是基于 Java 的,因此我可以创建一个包含PaymentOrder类文件的 jar 文件并与消费者和生产者共享它。然而,这是不好的做法,因为它引入了基于共享库的紧密耦合。更好的方法是使用一些代码重复(PaymentOrder在这种情况下为类)并通过同意消息格式来使用更松散的耦合方法。
public class PaymentOrder {
String from;
String to;
BigDecimal amount;
@JsonCreator
public PaymentOrder(@JsonProperty("from") String from,
@JsonProperty("to") String to,
@JsonProperty("amount") BigDecimal amount) {
this.from = from;
this.to = to;
this.amount = amount;
}
// getters and toString()
}
消费者配置
消费者只关心从中获取消息的队列。传入队列必须存在,否则消费者将无法启动。请注意,dead letter queue消费者启动时不必存在 ,但在消息需要“死信”时它应该存在。如果它丢失,则消息将被静默丢弃。
@Configuration
public class AmqpConfig {
@Bean
Queue incomingQueue() {
return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
.build();
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
默认情况下启用重新排队。为了“死信”消息,您需要将以下属性设置为 false。
spring:
rabbitmq:
listener:
default-requeue-rejected: false
但是,如果您想在某些错误情况下启用重新排队,最好保持启用重新排队并利用AmqpRejectAndDontRequeueException将发送basic.reject带有 requeue=false 的选项。
消费逻辑
每当传入队列上有消息可用时,将使用反序列化的实例process调用该方法。在这里,我们通过抛出一个扩展异常PaymentOrder来模拟消息拒绝。InsufficientFundsExceptionAmqpRejectAndDontRequeueException
@Component
public class Consumer {
@RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
if (new Random().nextBoolean()) {
throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
}
}
}
下图显示了一条消息的示例,该PaymentOrder消息被拒绝并最终进入dead letter queue
有时它有助于自动重试失败的操作,以防它可能在后续尝试中成功。RetryTemplateSpring AMQP 库在Spring Retry项目(从 Spring Batch 中提取)的帮助下提供了对此的支持。Spring Boot 使配置变得非常容易,RetryTemplate如下面的示例所示。
spring:
rabbitmq:
listener:
retry:
enabled: true
initial-interval: 2000
max-attempts: 2
multiplier: 1.5
max-interval: 5000
使用上述配置,重试功能已启用(默认情况下禁用),最多应有 2 次尝试传递消息,第一次和第二次尝试之间应为 2 秒,稍后与上一次重试间隔乘以 1.5 和最多 5 秒。运行您将在日志中看到的消费者
2016-09-07 21:56:53.396 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401 WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])
如您所见,使用 RabbitMQ 配置死信队列非常简单。如果大家想了解更多相关知识,不妨来关注一下极悦的RabbitMQ教程,里面有更丰富的知识等着大家去学习,希望对大家能够有所帮助。
0基础 0学费 15天面授
Java就业班有基础 直达就业
业余时间 高薪转行
Java在职加薪班工作1~3年,加薪神器
工作3~5年,晋升架构
提交申请后,顾问老师会电话与您沟通安排学习