1、为什么需要分布式锁

在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、ReentrantLock等;
但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保证同一个JVM进程中保证有效,所以这时就需要使用分布式锁了。

2、什么是分布式锁

在这里插入图片描述

分布式锁的特点

在这里插入图片描述

3、 synchronized 进行加锁演示

建立三张表:商品表、订单表、订单商品表

CREATE TABLE `product` (
  `id` int(11) NOT NULL,
  `product_name` varchar(255) DEFAULT NULL COMMENT '商品名字',
  `price` decimal(10,2) DEFAULT NULL COMMENT '商品价格',
  `count` bigint(50) unsigned DEFAULT NULL COMMENT '库存',
  `product_desc` varchar(255) DEFAULT NULL COMMENT '商品描述',
  `version` int(255) DEFAULT NULL COMMENT '乐观锁',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

CREATE TABLE `t_order` (
  `id` varchar(255) NOT NULL,
  `order_status` int(1) DEFAULT NULL COMMENT '订单状态 1 待支付 2已支付',
  `receiver_name` varchar(255) DEFAULT NULL COMMENT '收货人名字',
  `receiver_mobile` varchar(255) DEFAULT NULL COMMENT '收货人手机',
  `order_amount` decimal(10,2) DEFAULT NULL COMMENT '订单价格',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

CREATE TABLE `order_item` (
  `id` varchar(255) NOT NULL,
  `order_id` varchar(36) DEFAULT NULL COMMENT '订单ID',
  `produce_id` int(11) DEFAULT NULL COMMENT '商品ID',
  `purchase_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格',
  `purchase_num` int(11) DEFAULT NULL COMMENT '购买数量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
  @Transactional(rollbackFor = Exception.class)
    @Override
    public synchronized String createOrder0(Integer produceId, Integer purchaseNum) {
        // 1、根据商品id获取商品信息
        Product product = productMapper.selectById(produceId);
        // 2、判断商品是否存在
        if (product == null) {
            throw new RuntimeException("购买商品不存在");
        }
        log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());
        // 3、校验库存
        if (purchaseNum > product.getCount()) {
            throw new RuntimeException("库存不足");
        }
        // 4、更新库存操作
        int count = product.getCount() - purchaseNum;
        product.setCount(count);
        productMapper.updateById(product);
        // 5、创建订单
        TOrder order = new TOrder();
        //订单状态
        order.setOrderStatus(1);
        order.setReceiverName("张三");
        order.setReceiverMobile("18587781058");
        //订单价格
        order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseNum)));
        baseMapper.insert(order);
        // 6、创建订单和商品关联
        OrderItem orderItem = new OrderItem();
        //订单id
        orderItem.setOrderId(order.getId());
        // 商品id
        orderItem.setProduceId(product.getId());
        // 购买价格
        orderItem.setPurchasePrice(product.getPrice());
        // 购买数量
        orderItem.setPurchaseNum(purchaseNum);
        orderItemMapper.insert(orderItem);
        return order.getId();
    }

结果、商品表中有5个,但是订单表中却有9个订单,原因是在这个进程执行完后,锁就放开了,但是不能保证事务提交了。故采用下面这种方案来手动提交事务。

Spring进行了统一的抽象,形成了 PlatformTransactionManager事务管理器接口 ,事务的 提交、回滚等操作 全部交给它来实现

事务功能的总体接口设计

  • PlatformTransactionManager : 事务管理器
  • TransactionDefinition : 事务的一些基础信息,如超时时间、隔离级别、传播属性等
  • TransactionStatus : 事务的一些状态信息,如是否是一个新的事务、是否已被标记为回滚
   @Autowired
    private PlatformTransactionManager platformTransactionManager;
    @Autowired
    private TransactionDefinition transactionDefinition;
     @Override
    public  synchronized String createOrder(Integer produceId, Integer purchaseNum) {
        TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
        // 1、根据商品id获取商品信息
        Product product = productMapper.selectById(produceId);
        // 2、判断商品是否存在
        if (product == null) {
            platformTransactionManager.rollback(transaction);
            throw new RuntimeException("购买商品不存在");
        }
        log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());
        // 3、校验库存
        if (purchaseNum > product.getCount()) {
            platformTransactionManager.rollback(transaction);
            throw new RuntimeException("库存不足");
        }
        // 4、更新库存操作
        int count = product.getCount() - purchaseNum;
        product.setCount(count);
        productMapper.updateById(product);
        // 5、创建订单
        TOrder order = new TOrder();
        //订单状态
        order.setOrderStatus(1);
        order.setReceiverName("张三");
        order.setReceiverMobile("18587781058");
        //订单价格
        order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseNum)));
        baseMapper.insert(order);
        // 6、创建订单和商品关联
        OrderItem orderItem = new OrderItem();
        //订单id
        orderItem.setOrderId(order.getId());
        // 商品id
        orderItem.setProduceId(product.getId());
        // 购买价格
        orderItem.setPurchasePrice(product.getPrice());
        // 购买数量
        orderItem.setPurchaseNum(purchaseNum);
        orderItemMapper.insert(orderItem);
        //提交事务
        platformTransactionManager.commit(transaction);
        return order.getId();
    }

4、分布式锁解决方案

在这里插入图片描述

分布式的CAP理论告诉我们:“任何一个系统都无法满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)”,最多满足两项。故设计之初就要进行取舍。互联网的绝大多数场景中都是牺牲系统的强一致性来换取系统的高可用性。系统往往只保证最终一致性,只要这个时间在可接受范围内。

4.1 分布式锁实现方案

基于数据库实现的分布式锁

基于数据库实现的分布式锁主要利用数据库的唯一索引来实现,唯一索引泰然具有排他性。

基于Redis实现的分布式锁

1、使用Redis实现分布式锁效率最高,加锁速度最快,因为Redis几乎都是纯内存操作,而数据库和zookeeper都会涉及到磁盘文件IO,效率相对低下。
2、一般Redis实现分布式锁都是利用Redis的SETNX key value这个命令,当key不存在时才执行成功,如果key已经存在则命令执行失败。

基于Zookeeper实现的分布式锁

Zookeeper一般用作配置中心,其实现分布式锁的原理与Redis类似,我们在Zookeeper中创建临时顺序节点,利用节点不能重复创建来保证排他性。

4.2、实现分布式锁需要考虑的问题

1、是否可重入
2、锁释放机制
3、分布式锁服务单点问题

5、数据库的悲观锁实现分布式锁(for update)

悲观锁每次拿数据的时候都会上锁。

 <select id="findById" resultType="com.tqq.lock.entity.Product">
        select * from product where id = #{id} for update
    </select>

6、数据库乐观锁实现分布式锁(version)

1、取出记录,获取当前version
2、更新时,带上这个version
3、执行更新时候,set version = newversion where version = oldVersion
4、如果version不对,就更新失败

MySQL默认隔离级别是可重复度,

乐观锁SQL
  <update id="updateProduct">
        update product set count = count - #{purchaseNum},version = version+1 where id = #{id} and version = #{version}
    </update>
业务代码

代码逻辑主要写了关键部分,其余和上面的代码类似。

 @Override
    @Transactional(rollbackFor = Exception.class)
    public String createOrderOptimisticlock(Integer produceId, Integer purchaseNum) {
       //更新商品数量重试次数
        int retryCount = 0;
        //更新结果
        int update = 0;
        // 1、根据商品id获取商品信息
        Product product = productMapper.findById(produceId);
        // 2、判断商品是否存在
        if (product == null) {
            throw new RuntimeException("购买商品不存在");
        }
        log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());
        // 3、校验库存
        if (purchaseNum > product.getCount()) {
            throw new RuntimeException("库存不足");
        }
        /**
         * 乐观锁更新库存
         * 更新失败 说明其他线程已经更新过数据,本地扣减库存失败,可以重试,最多重试3次
         */

        while (retryCount < 3 && update == 0){
            update = this.reduceCount(produceId, purchaseNum);
            retryCount ++;
        }
        if (update == 0) {
            throw new RuntimeException("库存不够");
        }
        // 5、创建订单
        // 6、创建订单和商品关联
    }

更新库存方法

/**
     减库存
   * mysql 默认事务隔离级别可重复读,
   * 会导致在同一个事务里面查询3次   
  *	productMapper.selectById(produceId);
     *  得到的数据始终都是相同的。 所以我们就提供一个reduceCount 
     * 每次循环都启动一个新的事务扣减库存操作 。
     * @param produceId
     * @param purchaseNum
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public int reduceCount(Integer produceId, Integer purchaseNum) {
        int result = 0;
        // 1. 查询商品库存
        Product product = productMapper.selectById(produceId);
        // 2. 判断库存
        if (product.getCount() >= purchaseNum) {
            //3. 减库存  乐观锁更新库存
            result = productMapper.updateProduct(product.getId(), purchaseNum, product.getVersion());

        }
        return result;
    }

7、Redis实现分布式锁原理

获取锁
互斥:确保只有一个线程获得锁

setnx lock thread1

释放锁
1、手动释放
2、超时释放

#手动释放,删除即可
del lock

超时释放

setnx lock  thread1
expire lock 5
ttl lock

# 第二种方式

SET key value [EX seconds] [PX milliseconds] [NX|XX]
set lock k1 ex 5 nx
引入依赖
 <!--redis依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
配置
spring:
  redis:
    host: localhost
    port: 6379
代码实现
  @Override
    @Transactional(rollbackFor = Exception.class)
    public String createOrderRedis(Integer produceId, Integer purchaseNum) {

        String key = "lock";
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key + produceId, Thread.currentThread().getId() + "", 10, TimeUnit.SECONDS);
        if (!result){
            return "不允许重复下单!";
        }
        try {
            // 1、根据商品id获取商品信息
           
            // 2、判断商品是否存在
           
            // 3、校验库存
         
            // 4、更新库存操作
           
            // 5、创建订单
        
            // 6、创建订单和商品关联
           

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 1、获取锁标识
            String threadIdFlag = stringRedisTemplate.opsForValue().get(key + produceId);
            //2、获取当前线程id
            String id = Thread.currentThread().getId()+"";
            if (id.equals(threadIdFlag)){
                stringRedisTemplate.delete(key+produceId);
            }

        }
        return "创建失败!";
    }

在释放锁的时候判断锁标识,是自己线程的锁才可以释放锁,防止锁被误删除。如下图:
在这里插入图片描述

7.1、Redis分布式锁误删除问题解决方案

  • 设置超时时间远大于业务执行时间,会带来性能问题
  • 删除锁的时候判断,如果是自己的才删除
#1、配置锁标识
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-","");
#2、获取锁
 //1、获取线程标识
 String threadId = ID_PREFIX + Thread.currentThread().getId();
 // 2、获得锁 setnx key value time type
 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+produceId, threadId, 30, TimeUnit.SECONDS);
# 3、释放锁
// 获取锁标识
 String s = stringRedisTemplate.opsForValue().get(KEY_PREFIX + produceId);
 // 判断标识是否一致
 if (s.equals(threadId)){
  // 释放锁
stringRedisTemplate.delete(KEY_PREFIX + produceId);
        }

7.2、Redis分布式锁不可重入问题

在这里插入图片描述
在这里插入图片描述

7.3、基于Redission的分布式锁实现

引入依赖
 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.17.3</version>
</dependency>

创建工具类

package com.tqq.lock.utils;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * 分布式锁
 */
@Slf4j
@Component
public class DistributedRedisLock {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 加锁
     *
     * @param lockName
     * @return
     */
    public Boolean lock(String lockName) {
        // 1. 判断 redisclient是否为空
        if (redissonClient == null) {
            log.info("DistributedRedisLock redission  client  is  null");
            return false;
        }

        try {
            // 2. 加锁
            RLock lock = redissonClient.getLock(lockName);
            // 3. 添加过期时间
            // 加锁以后10秒钟自动解锁
            // 无需调用unlock方法手动解锁
            lock.lock(10, TimeUnit.SECONDS);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 释放锁
     *
     * @param lockName
     * @return
     */
    public Boolean unlock(String lockName) {
        // 1. 判断 redisclient是否为空
        if (redissonClient == null) {
            log.info("DistributedRedisLock redission  client  is  null");
            return false;
        }
        try {
            RLock lock = redissonClient.getLock(lockName);
            lock.unlock();
            // 释放锁
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}

业务方法

@Override
    @Transactional(rollbackFor = Exception.class)
    public String createOrderRedission(Integer produceId, Integer purchaseNum) {
        // 1. 加锁
        String key = "lock:";
        Boolean lock = distributedRedisLock.lock(key.concat(produceId + ""));
        // 2. 判断是否获取到锁
        if (!lock) {
            return "失败";
        }
        try {
            // 1、根据商品id获取商品信息
            // 2、判断商品是否存在
            // 3、校验库存
            // 4、更新库存操作
            // 5、创建订单
            // 6、创建订单和商品关联
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            distributedRedisLock.unlock(key.concat(produceId + ""));
        }
        return "失败";
    }

8、zookeeper实现分布式锁

Zookeeper的结点Znode有四种类型
1、持久节点:默认的结点类型,创建结点的客户端与zookeeper断开连接后,该节点依旧存在;
2、持久节点顺序节点:所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号,持久节点顺序节点就是有顺序的持久结点;
3、临时结点:和持久结点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除;
4、顺序节点临时节点:有顺序的临时结点

创建临时顺序节点:

create -e -s /test123

-e :临时结点
-s:顺序节点
在这里插入图片描述

8.1、ZK分布式锁的实现原理

当第一个客户端请求过来时,zookeeper客户端会创建一个持久节点locks,若它(client1)想要获取锁,需要在locks节点下创建一个顺序节点lock1;然后客户端client1会查找locks下的所有临时顺序节点,看自己是不是最小的那一个,如果是,则获取锁。
这时如果又来了一个客户端client2前来尝试获取锁,他会在locks下创建一个临时顺序节点lock2,它会和client1一样在locks下查找,发现lock1才是最小的,获取锁失败。client2会想它靠前的节点lock1注册watch时间,用来监听lock1是否存在。即client2抢锁失败进入等待状态。
如果在来client3,也会向client2一样创建、查找、监听。
在这里插入图片描述
如果任务完成,client1会显式调用删除lock1的指令;
如果客户端故障了,根据临时结点的特性,lock1会自动删除的;
lock1删除,client2会立刻接收到通知,它会再在locks下进行扫描,发现lock2是最小的,获得锁。
在这里插入图片描述在这里插入图片描述

  • Zookeeper设计定位就是分布式协调,如果获取不到锁,只需要添加一个监听器就可以,很适合做分布式锁
  • Zookeeper做分布式锁,如果有很多的客户端频繁的申请锁、释放锁,对于Zookeeper集群的压力会比较大。

8.2、Apache Curator 简介

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化ZooKeeper的操作。

Curator主要解决了三类问题
  • 封装Zookeeper Client 与Zookeeper Server之间的连接处理;
  • 提供了一套Fluent风格的操作API
  • 提供了Zookeeper各种应用场景(如分布式锁、领导集群选举、缓存机制、分布式队列等)的抽象分装。
Curator主要从以下几个方面降低了zk使用的复杂性

在这里插入图片描述

引入依赖
 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>5.2.0</version>
        </dependency>
配置类
@Configuration
public class ZookeeperConfig {


    /**
     * 创建Cuator客户端
     * @return
     */
    @Bean
    public CuratorFramework getZkClient(){
        // 1. 创建cuator客户端
        CuratorFramework client = CuratorFrameworkFactory.builder()
                // zk链接地址
                .connectString("localhost:2181")
                // 回话超时时间
                .sessionTimeoutMs(5000)
                // 链接创建超时时间
                .connectionTimeoutMs(5000)
                // 重试策略
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.start();
        return client;
    }

}
业务代码
 @Transactional(rollbackFor = Exception.class)
    @Override
    public String createOrderZooKeeper(Integer produceId, Integer purchaseNum) throws Exception {
        //InterProcessMutex公平锁
        //client cruator 中zk客户端对象 path强锁路径同一个锁path需要一致
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/lockPath");
        //尝试获取锁,第一个参数 时间数字  第二个参数 时间单位
        if (lock.acquire(5,TimeUnit.SECONDS)){
            try {
                // 1、根据商品id获取商品信息
                // 2、判断商品是否存在
                // 3、校验库存
                // 4、更新库存操作
                // 6、创建订单和商品关联
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.release();
            }
        }
        return "下单失败";
    }

9、 三种锁的对比

数据库分布式锁实现

优点:简单、使用方便;不需要引入Redis、Zookeeper等中间件;
缺点:不适合高并发场景、db操作性能较差。

9.1、Redis分布式锁实现

优点:性能好,适合高并发场景;较轻量级;有较好的框架支持,如Redisson
缺点:过期时间不好控制;需要考虑锁被其他献策会给你误删的场景

9.2、 Zookeeper分布式锁实现

优点:有较好的性能和可靠性,有封装较好的框架,如Curator;
缺点:性能不如Redis实现的分布式锁;比较重的分布式锁;

性能角度:Redis>Zookeeper>数据库
实现复杂程度:ZooKeeper>Redis>数据库
可靠性:ZooKeeper>Redis>数据库

原码:https://gitee.com/hellotqq/distributed/tree/master/lock/lock

Logo

鸿蒙生态一站式服务平台。

更多推荐