本文共 7500 字,大约阅读时间需要 25 分钟。
很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock。在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。
分布式环境下,数据一致性问题一直是一个比较重要的话题,而又不同于单进程的情况。分布式与单机情况下最大的不同在于其不是多线程而是多进程。多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。
基于数据库的锁实现也有两种方式,一是基于数据库表,另一种是基于数据库排他锁。
基于数据库表增删是最简单的方式,首先创建一张锁的表主要包含下列字段:方法名,时间戳等字段。
具体使用的方法,当需要锁住某个方法时,往该表中插入一条相关的记录。这边需要注意,方法名是有唯一性约束的,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。执行完毕,需要delete该记录。当然,笔者这边只是简单介绍一下。对于上述方案可以进行优化,如应用主从数据库,数据之间双向同步。一旦挂掉快速切换到备库上;做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍;使用while循环,直到insert成功再返回成功,虽然并不推荐这样做;还可以记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了,实现可重入锁。
我们还可以通过数据库的排他锁来实现分布式锁。基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:
public void lock(){ connection.setAutoCommit(false) int count = 0; while(count < 4){ try{ select * from lock where lock_name=xxx for update; if(结果不为空){ //代表获取到锁 return; } }catch(Exception e){ } //为空或者抛异常的话都表示没有获取到锁 sleep(1000); count++; } throw new LockException();}
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。其他没有获取到锁的就会阻塞在上述select语句上,可能的结果有2种,在超时之前获取到了锁,在超时之前仍未获取到锁。
获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,释放锁connection.commit()。
存在的问题主要是性能不高和sql超时的异常。
上面两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
基于zookeeper临时有序节点可以实现的分布式锁。每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
提供的第三方库有curator,具体使用读者可以自行去看一下。Curator提供的InterProcessMutex是分布式锁的实现。acquire方法获取锁,release方法释放锁。另外,锁释放、阻塞锁、可重入锁等问题都可以有有效解决。讲下阻塞锁的实现,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是就获取到锁,便可以执行业务逻辑。
最后,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。并发问题,可能存在网络抖动,客户端和ZK集群的session连接断了,zk集群以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。
相对于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点,存取速度快很多。而且很多缓存是可以集群部署的,可以解决单点问题。基于缓存的锁有好几种,如memcached、redis、本文下面主要讲解基于redis的分布式实现。
基于redis的分布式锁实现
使用redis的SETNX实现分布式锁,多个进程执行以下Redis命令:
SETNX lock.id
SETNX是将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。
SETNX实现分布式锁,可能会存在死锁的情况。与单机模式下的锁相比,分布式环境下不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。某个线程获取了锁之后,断开了与Redis 的连接,锁没有及时释放,竞争该锁的其他线程都会hung,产生死锁的情况。
在使用 SETNX 获得锁时,我们将键 的值设置为锁的有效时间,线程获得锁后,其他线程还会不断的检测锁是否已超时,如果超时,等待的线程也将有机会获得锁。然而,锁超时,我们不能简单地使用 DEL 命令删除键 以释放锁。
考虑以下情况:
对于上面的步骤进行改进,问题是出在删除键的操作上面,那么获取锁之后应该怎么改进呢?
首先看一下redis的GETSET这个操作,GETSET key value,将给定 key 的值设为 value ,并返回 key 的旧值(old value)。利用这个操作指令,我们改进一下上述的步骤。获取锁
public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException { acquireTimeout = timeUnit.toMillis(acquireTimeout); long acquireTime = acquireTimeout + System.currentTimeMillis(); //使用J.U.C的ReentrantLock threadLock.tryLock(acquireTimeout, timeUnit); try { //循环尝试 while (true) { //调用tryLock boolean hasLock = tryLock(); if (hasLock) { //获取锁成功 return true; } else if (acquireTime < System.currentTimeMillis()) { break; } Thread.sleep(sleepTime); } } finally { if (threadLock.isHeldByCurrentThread()) { threadLock.unlock(); } } return false;}public boolean tryLock() { long currentTime = System.currentTimeMillis(); String expires = String.valueOf(timeout + currentTime); //设置互斥量 if (redisHelper.setNx(mutex, expires) > 0) { //获取锁,设置超时时间 setLockStatus(expires); return true; } else { String currentLockTime = redisUtil.get(mutex); //检查锁是否超时 if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) { //获取旧的锁时间并设置互斥量 String oldLockTime = redisHelper.getSet(mutex, expires); //旧值与当前时间比较 if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) { //获取锁,设置超时时间 setLockStatus(expires); return true; } } return false; }}
lock调用tryLock方法,参数为获取的超时时间与单位,线程在超时时间内,获取锁操作将自旋在那里,直到该自旋锁的保持者释放了锁。
tryLock方法中,主要逻辑如下:
public boolean unlock() { //只有锁的持有线程才能解锁 if (lockHolder == Thread.currentThread()) { //判断锁是否超时,没有超时才将互斥量删除 if (lockExpiresTime > System.currentTimeMillis()) { redisHelper.del(mutex); logger.info("删除互斥量[{}]", mutex); } lockHolder = null; logger.info("释放[{}]锁成功", mutex); return true; } else { throw new IllegalMonitorStateException("没有获取到锁的线程无法执行解锁操作"); }}
/** * 资金方 充值 * * @param req * @return */ @DisableDecode @PostMapping(value = "/recharge", produces = MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "充值", httpMethod = "POST") public ResponseBeanrecharge(@Validated @RequestBody RequestBean req) { FundRechargeRequestData data = req.getData(); // 获得锁对象实例 RLock lock = redissonClient.getLock(RedisContants.DIST_LOCK_FUND + data.getFundId()); try { // 尝试加锁,最多等待3秒,上锁以后10秒自动解锁 方法尝试获取锁,如果成功获得锁,则继续往下执行,否则等待锁被释放,然后再继续尝试获取锁,直到成功获得锁。 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS); if (res) { // 成功 fundAmountOperateStrategyContext.handle(FundAmountOptTypeEnum.RECHARGE, data.getFundId(), data.getAmount(), data.getOperator(), data.getRemark()); } } catch (Exception e) { log.error("recharge error: ", e); } finally { // 方法释放获得的锁,并通知等待的节点锁已释放。 lock.unlock(); } return new ResponseBean(Status.SUCCESS); }
本文主要讲解了基于redis分布式锁的实现,在分布式环境下,数据一致性问题一直是一个比较重要的话题,而synchronized和lock锁在分布式环境已经失去了作用。常见的锁的方案有基于数据库实现分布式锁、基于缓存实现分布式锁、基于Zookeeper实现分布式锁,简单介绍了每种锁的实现特点;然后,文中探索了一下redis锁的实现方案;最后,本文给出了基于Java实现的redis分布式锁,读者可以自行验证一下。
转载地址:http://zwrli.baihongyu.com/