更新时间:2021-07-28 17:02:21 来源:极悦 浏览1095次
持久化消息和非持久化消息的发送策略
消息同步发送和异步发送
ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能。
异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消息的时候,尽量去开启事务会话。
除了持久化消息和非持久化消息的同步和异步特性外,还可以通过以下几种方式来设置异步发送
消息的发送原理分析图解
ProducerWindowSize的含义
producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。代码在:ActiveMQSession的1957行
主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺寸减少(producerAck.size,此size表示先前发送消息的大小)。
Ø在brokerUrl中设置:"tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的producer生效。
Ø在destinationUri中设置:"test-queue?producer.windowSize=1048576",此参数只会对使用此Destination实例的producer失效,将会覆盖brokerUrl中的producerWindowSize值。
持久化消息和非持久化消息的存储原理
正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点
SystemUsage配置设置了一些系统内存和硬盘容量
Ø从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀值时,ActiveMQ会将内存中的非持久化消息写入到临时文件,以便腾出内存。但是它和持久化消息的区别是,重启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除
ActiveMQ消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式
ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
ØKahaDB存储(默认存储方式)ØJDBC存储
ØMemory存储
ØLevelDB存储
ØJDBC With ActiveMQ Journal
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB的配置方式
在data/kahadb这个目录下,会生成四个文件
Ødb.data它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息
Ødb.redo用来进行消息恢复
Ødb-*.log存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增
Ølock文件锁,表示当前获得kahadb读写权限的broker
JDBC存储
使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS消息表,queue和topic都存在这个表中
ACTIVEMQ_ACKS存储持久订阅的信息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCKS锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
JDBC存储实践
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false
Mysql持久化Bean配置
添加Jar包依赖
消息消费流程图
消息重发的情况
在正常情况下,有几中情况会导致消息重新发送
Ø在事务性会话中,没有调用session.commit确认消息或者调用
session.rollback方法回滚消息
Ø在非事务性会话中,ACK模式为CLIENT_ACKNOWLEDGE的情况下,没有调用acknowledge或者调用了recover方法;
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费端会给broker发送一个”poison ack”(ActiveMQMessageConsumer#dispatch:1460行),表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
ActiveMQ采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反比。
1.吞吐量低。由于ActiveMQ需要建立索引,导致吞吐量下降。这是无法克服的缺点,只要使用完全符合JMS规范的消息中间件,就要接受这个级别的TPS。
2.无分片功能。这是一个功能缺失,JMS并没有规定消息中间件的集群、分片机制。而由于ActiveMQ是伟企业级开发设计的消息中间件,初衷并不是为了处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向拆分。ActiveMQ官方不提供分片机制,需要自己实现。
适用场景
对TPS要求比较低的系统,可以使用ActiveMQ来实现,一方面比较简单,能够快速上手开发,另一方面可控性也比较好,还有比较好的监控机制和界面
消息量巨大的场景。ActiveMQ不支持消息自动分片机制,如果消息量巨大,导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。
以上就是极悦小编介绍的"ActiveMQ原理分析",希望对大家有帮助,想了解更多可查看ActiveMQ教程。极悦在线学习教程,针对没有任何Java基础的读者学习,让你从入门到精通,主要介绍了一些Java基础的核心知识,让同学们更好更方便的学习和了解Java编程,感兴趣的同学可以关注一下。
0基础 0学费 15天面授
Java就业班有基础 直达就业
业余时间 高薪转行
Java在职加薪班工作1~3年,加薪神器
工作3~5年,晋升架构
提交申请后,顾问老师会电话与您沟通安排学习