更新时间:2022-02-15 10:18:51 来源:极悦 浏览2261次
Redis 单节点实现了分布式锁。如果你通过 Sentinel 有高可用,如果主会话发生变化,如果某些原因发生变化,就会发生锁丢失。
(1)客户端 1 获得 Redis 的 Master 节点上的锁。
(2)Master宕机,存储锁的钥匙还没有到Slave。
(3)Master故障,故障转移,SLAVE节点升级为Master节点。
(4)Client 2 从新的 master 获取相同资源对应的锁。
这样,客户端1和客户端2同时持有相同资源的锁。锁的安全性被破坏了。对于这个问题。Redis 作者 Antirez 提出了 Redlock 算法来解决这个问题。
当不同进程需要互斥访问共享资源时,分布式锁是一种非常有用的技术手段。实现高效分布式锁的三个属性需要考虑:
(1)安全属性:互斥,无论何时,只有一个客户端持有锁。
(2)效率属性 A:不会死。
(3)效率属性B:容错性,只要大部分Redis节点正常工作,客户端都可以获取和释放锁。
在算法的分布式版本中,我们假设我们有 N 个完全独立的 Redis Master 节点,我们没有任何副本或其他隐式分布式协调算法。我们已经描述了如何在单节点环境中安全地获取和释放锁。因此,我们自然应该使用这种方法来获取和释放每个单个节点中的锁。在我们的示例中,我们将 N 设置为 5,这个数字是一个比较合理的值,所以我们需要在不同的计算机或虚拟机上运行 5 个 Master 节点,以确保它们不会同时出现。机器。客户端需要执行以下操作才能获得锁:
(1)获取当前时间(单位为毫秒)。
(2)turnt用于n个节点上相同的key和随机值。在这一步中,当客户端请求每个master上的锁时,会有一个比总锁释放时间小的超时。时间。比如锁自动释放时间为10秒,每个节点锁请求的超时时间可能在5-50毫秒的范围内,这样可以防止客户端在一个过期的Master节点上阻塞太久,如果一个Master节点是不可用,我们应该尽快尝试下一个Master节点。
(3)客户端计算第二步花费锁的时间,只有当客户端成功获取锁(这里是3),并且总时间消耗不超过锁释放时间,这个锁才算成功.
(4)如果锁获取成功,锁自动释放时间是在锁消耗之前花费初始锁释放时间的时间。
(5)如果锁获取失败,无论是因为锁成功不超过一半(N/2+1)还是因为总耗时时间超过锁释放时间,客户端都会释放每个Master节点上的锁,即使是那些相信没有成功锁定的人。
Redisson 包已经封装了 RedLock 算法,该算法需要使用 Redisson 包查看分布式锁的正确姿势。
<!-- JDK 1.8 + 兼容 -->
<依赖>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<版本> 3.9。0 </版本>
</依赖>
<!-- JDK 1.6 + 兼容 -->
<依赖>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<版本> 2.14。0 </版本>
</依赖>
Redisson管理类:
导入 org.redisson.Redisson;
导入 org.redisson.api.RAtomicLong;
导入 org.redisson.api.RedissonClient;
导入 org.redisson.config.Config;
公共 类RedissonManager {
私有 静态配置配置 =新配置();
私有 静态RedissonClient redisson = null ;
private static final String RAtomicName = " genId_ " ;
公共 静态 无效初始化(){
尝试{
config.useClusterServers()
.setScanInterval( 200000 ) //设置集群状态扫描间隔
.setMasterConnectionPoolSize( 10000 ) //设置Master节点的连接池最大连接数为10000
.setSlaveConnectionPoolSize( 10000 ) //设置Master节点的最大连接数SLAVE 节点的连接池为 500
.setIdleConnectionTimeout( 10000 ) //如果当前连接池中的连接数超过了最小空闲连接,同时还有一个连接空闲时间超过了该值,那么这些连接将被自动关闭并从连接池中移除。时间单位是毫秒。
.setConnectTimeout( 30000 ) //等待与任意节点建立连接。时间单位是毫秒。
.setTimeout( 3000 ) //等待节点回复命令。该时间从命令发送成功时开始。
.setRetryInterval( 3000 ) //当与节点断开连接时,等待时间间隔重新建立连接。时间单位是毫秒。
.addNodeAddress( " redis://127.0.0.1:7000 " , " redis://127.0.0.1:7001 " , " redis://127.0.0.1:7002 " , " redis://127.0.0.1:7003 " , " redis://127.0.0.1:7004 " , " redis://127.0.0.1:7005 " );
redisson = Redisson.create(config);
RAtomicLong atomicLong = redisson.getAtomicLong (RAtomicName);
原子长。设置(0);//自增设置为从 0 开始
} catch (Exception e){
e.printStackTrace();
}
}
公共 静态RedissonClient getRedisson(){
if (redisson == null ){
RedissonManager.init(); //初始化
}
return redisson;
}
我们已经配置了很多参数,其实有十个参数,我们只是设置了几个比较重要的。
getredisson 方法是用户初始化 Redisson。
NEXTID 方法 返回 RatomicName 变量的总次数,也就是我成功使用分布式锁的次数。
分布式锁定操作类:
导入 com.config.RedissonManager;
导入 org.redisson.api.RLock;
导入 org.redisson.api.RedissonClient;
导入 org.slf4j.Logger;
导入 org.slf4j.LoggerFactory;
导入 org.springframework.stereotype.Component;
导入 java.util.concurrent.TimeUnit;
@零件
public class RedissonLock {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonLock.class ) ;
私有 静态RedissonClient redissonClient = RedissonManager.getRedisson();
公共 无效 锁(字符串锁名){
字符串键=锁名;
RLock myLock = redissonClient.getLock(key);
// Lock 提供 Timeout 参数,Timeout 结束强制解锁防止死锁
myLock. 锁定(2 ,TimeUnit.SECONDS);
// 1. 最常用的使用方法
// lock.lock();
// 2. 支持过期解锁功能,10秒后自动解锁,无需调用UNLOCK方法手动解锁
// lock.lock(10, TimeUnit.SECONDS);
// 3. 尝试加锁,等待3秒,加锁后10秒自动解锁
// try {
// boolean res = mylock.tryLock(3, 10, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.err.println( " ======lock====== " + Thread.currentThread().getName());
}
公共 无效解锁(字符串锁名){
字符串键=锁名;
RLock myLock = redissonClient.getLock(key);
myLock.unlock();
System.err.println( " ======解锁====== " + Thread.currentThread().getName());
}
}
LOCK方法是锁定操作,UNLOCK方法是解锁。如果您想了解更多相关知识,可以关注一下极悦的Java极悦在线学习,里面的课程从入门到精通,通俗易懂,适合小白学习,希望对大家能够有所帮助。
0基础 0学费 15天面授
Java就业班有基础 直达就业
业余时间 高薪转行
Java在职加薪班工作1~3年,加薪神器
工作3~5年,晋升架构
提交申请后,顾问老师会电话与您沟通安排学习