为Java多线程应用程序优化数据存储库
数据存储库通常是超高要求系统的瓶颈。在这些系统中,正在执行的查询数量非常大。DelayedBatchExecutor是一个用于减少所需查询数量的组件,通过在Java多线程应用程序中对所需查询进行批处理。
1个参数的n个查询Vs.n个参数的1个查询
假设有一个对关系数据库执行查询的Java应用程序,以便在给定其唯一标识符(id)的情况下检索Product实体(row)。
查询如下所示:
SELECT*FROMPRODUCTWHEREID=<productId>
现在,检索n个Products,有如下两种方法:
执行1个参数的n个独立查询:
SELECT*FROMPRODUCTWHEREID=<productId1>
SELECT*FROMPRODUCTWHEREID=<productId2>
...
SELECT*FROMPRODUCTWHEREID=<productIdn>
使用IN运算符或ORs的串联,对n个参数执行1个查询以便同时检索n个Products
--ExampleusingINOPERATOR
SELECT*FROMPRODUCTWHEREIDIN(<productId1>,<productId2>,...,<productIdn>)
后者在网络流量和数据库服务器资源(CPU和磁盘)方面更为有效,因为:
往返数据库的次数为1,而不是n。
数据库引擎优化了n个参数的数据遍历过程,即每个表格可能只需要扫描1次,而不是n次。
这不仅适用于SELECT操作,而且适用于其他操作,例如INSERTs,UPDATEs和DELETEs。实际上,JDBCAPI包括上述操作的批量处理操作。
同样的情况也适用于NoSQL存储库,其中大多都明确提供BULK操作。
DelayedBatchExecutor
需要从数据库中检索数据的Java应用程序,如REST微服务或异步消息处理器,通常以多线程应用程序(*1)实现,其中:
每个线程在其执行的某个时刻执行相同的查询(每个查询具有不同的参数)。
并发线程数很高(每秒数十或数百)。
在这种场景下,数据库很可能在较短的时间间隔内多次执行相同的查询。
如前所述,如果将1个参数的n个查询替换为具有n个参数的单个等效查询,那么则应用程序将使用较少的数据库服务器和网络资源。
好消息是它可以通过timewindows(时间窗口)的机制来实现,如下所示:
第一个尝试执行查询的线程会打开一个时间窗口,因此其参数被存储在一个列表中,同时该线程被暂停。在时间窗口内执行相同查询的其余线程会将其参数添加到列表中,并且也会被暂停。此时,数据库上未执行任何查询。
时间窗口结束或列表已满(先前已定义最大容量限制)后,将使用列表中存储的所有参数执行单个查询。最后,一旦数据库提供了该查询的结果,每个线程将接收相应的结果,同时所有线程将自动恢复。
笔者构建了一个简单而轻量级的应用机制(DelayedBatchExecutor),很容易在新的或现有的应用程序中使用。它基于Reactor库,并且为参数列表使用超时的Flux缓冲发布器。
运用DelayedBatchExecutor的吞吐量和延迟分析
假设针对Products的REST微服务公开了一个端点,用于检索数据库中给定的productId的Product数据。在没有DelayedBatchExecutor的情况下,如果每秒对端点命中200次,则数据库每秒执行200个查询。如果端点使用的DelayedBatchExecutor配置了50毫秒的时间窗口且最大容量=10个参数,数据库每秒钟将只执行10个参数的20个查询,代价是每执行一个线程,最多在50毫秒内增加延时(*2)。
换句话说,为了将延时增加50毫秒(*2),数据库每秒接收的查询减少了10倍,然而保持了系统的整体吞吐量。还不错!!
其他有趣的配置:
窗口时间=100毫秒,最大容量=20个参数→20个参数的10个查询(查询减少20倍)
窗口时间=500毫秒,最大容量=100个参数→2个查询100个参数(查询减少100倍)
执行中的DelayedBatchExecutor
深入研究Product微服务示例。假设对于每个传入的HTTP请求,微服务的控制器都要求检索已有id的Product(JavaBean),因此将调用以下方法:
DAO组件(ProductDAO)的publicProductgetProductById(IntegerproductId).
以下分别是有和没有DelayedBatchExecutor的DAO执行。
没有DelayedBatchExecutor
publicclassProductDAO{
publicProductgetProductById(Integerid){
Productproduct=...//executethequerySELECT*FROMPRODUCTWHEREID=<id>
//usingyourfavouriteAPI:JDBC,JPA,Hibernate...
returnproduct;
}
...
}
有DelayedBatchExecutor
//Singleton
publicclassProductDAO{
DelayedBatchExecutor2<Product,Integer>delayedBatchExecutorProductById=
DelayedBatchExecutor.define(Duration.ofMillis(50),10,this::retrieveProductsByIds);
publicProductgetProductById(Integerid){
Productproduct=delayedBatchExecutorProductById.execute(id);
returnproduct;
}
privateList<Product>retrieveProductsByIds(List<Integer>idList){
List<Product>productList=...//executequery:SELECT*FROMPRODUCTWHEREIDIN(idList.get(0),...,idList.get(n));
//usingyourfavouriteAPI:JDBC,JPA,Hibernate...
//Thepositionsoftheelementsofthelisttoreturnmustmatchtheonesintheparameterslist.
//Forinstance,thefirstProductofthelisttobereturnedmustbetheonewith
//theIdinthefirstpositionofproductIdsListandsoon...
//NOTE:nullcouldbeusedasvalue,meaningthatnoProductexistforthegivenproductId
returnproductList;
}
...
}
首先,必须在DAO中创建一个DelayedBatchExecutor实例,在本例中为delayedBatchExecutorProductById。需要以下三个参数:
时间窗口(在此示例中为50毫秒)
参数列表的最大容量(在此示例中为10个参数)
将使用参数列表调用的方法(详细信息见后文)。在此示例中,方法为retrieveProductsByIds
其次,已经重构了DAO方法publicProductgetProductById(IntegerproductId),以简单调用delayedBatchExecutorProductById实例的execute方法。所有的“magic”都是由DelayedBatchExecutor完成的。
之所以delayedBatchExecutorProductById是DelayedBatchExecutor2<Product,Integer>的实例,是因为其execute方法返回一个Product实例并接收一个Integer实例作为其实际参数。因此,存在:DelayedBatchExecutor2<Product,Integer>。
如果execute方法需要接收两个参数(例如,一个Integer和一个String)并返回Product实例,则定义为DelayedBatchExecutor3<Product,Integer,String>等。
最终,retrieveProductsByIds方法必须返回List<Product>并接收List<Integer>作为参数。
如果使用的是DelayedBatchExecutor3<Product,Integer,String>,则必须将retrieveProductsByIds设为List<Product>retrieveProductsByIds(List<Integer>productIdsList,List<String>stringList)
就是这样。
一旦运行,执行控制器逻辑的并发线程会在某时刻调用方法getProductById(Integerid),并且此方法将返回对应的Product。并发线程不知自己已经被DelayedBatchExecutor暂停并恢复了。
以上就是极悦注册机构小编介绍的“java数据库多线程应用程序优化数据存储库”的内容,希望对大家有帮助,如有疑问,请在线咨询,有专业老师随时为你服务。
相关内容
你适合学Java吗?4大专业测评方法
代码逻辑 吸收能力 技术学习能力 综合素质
先测评确定适合在学习