Reids实战—黑马点评(三)秒杀篇

news/2024/4/19 10:38:08/文章来源:https://blog.csdn.net/yangsf_/article/details/129229571

Reids实战—黑马点评(三)秒杀篇

来自黑马的redis课程的笔记

【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】

在这里插入图片描述

目录

  • Reids实战—黑马点评(三)秒杀篇
    • 一、全局唯一ID
      • 小结
    • 二、实现优惠券秒杀下单
    • 三、超卖问题
      • 3.1 问题描述
      • 3.2 乐观锁和悲观锁
      • 3.3 乐观锁实现
        • 3.3.1 版本号法
        • 3.3.2 CAS(Compare And Swap)
      • 3.4 小结
    • 四、一人一单
      • 4.1 添加功能
      • 4.2 并发问题
      • 4.3 字符串问题
      • 4.4 Spring事务问题
        • 4.4.1 锁在事务内
        • 4.4.2 事务不生效
      • 4.5 集群模式下的一人一单问题
    • 五、分布式锁
      • 5.1 分布式锁概述
      • 5.2 基于redis实现分布式锁
        • 5.2.1 锁误删
        • 5.2.2 Lua脚本保证命令原子性
      • 5.3 小结
    • 六、Redisson
      • 6.1 使用Redisson分布式锁
      • 6.2 Redisson分布式锁原理
        • 6.2.1 可重入原理
        • 6.2.2 可重试原理
        • 6.2.3 解决超时释放
        • 6.2.4 保证主从一致
        • 6.2.5 小结
    • 七、redis优化秒杀
    • 八、Redis消息队列实现异步秒杀
      • 8.1 基于List结构模拟消息队列
      • 8.2 PubSub发布订阅模式
      • 8.3 基于Stream结构的消息队列
        • 8.3.1 基本使用
        • 8.3.2 消费者组
        • 8.3.3 改造秒杀业务
        • 8.3.4 小结

一、全局唯一ID

每一个订单都需要不同的ID,如何做到ID唯一?

如果似乎用自增:

  • id规律明显(安全问题)
  • 受单表数据量限制

为了解决这些问题,我们有了全局ID生成器,这是一种生成全局唯一ID(或者叫分布式唯一ID)的工具,所生成的ID满足以下特征:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性(复杂递增)

唯一性和递增性我们可以用redis自增来实现,恰好,redis本身就满足高可用、高性能,安全性如何解决?

方案:

在这里插入图片描述

用Long(数值效率比字符串高)类型,一个Long类型占64位(Java中),我们可以在这些bit位上做文章。

首先:从左数第一位,是符号位,我们的id永远为正数,所以让它永远为0。接下来的31位,用来存时间戳,保证安全性的同时,让id基本不可能重复。最后的32位,我们存普通的递增值。这个方案,理论上可以保证68年都不会有id重复。

为了避免最后的自增值超出redis自增限制,我们可以每天新建一个key用于自增(例如,yyyy:MM:dd),不仅解决了问题,还方便统计。

实现:

@Component
public class RedisIdWorker {private StringRedisTemplate stringRedisTemplate;// 起始的时间戳值private static final Long BEGIN_TIMESTAMP = 1640995200L;public RedisIdWorker (StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}/*** @param “业务key前缀“* */public long nextId(String keyPrefix) {LocalDateTime now = LocalDateTime.now();long  nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + data);// 位运算提高效率return timestamp << 32 | count;}
}

起始的时间戳的值可以这样计算:

@Test
void timeTest() {LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);long epochSecond = time.toEpochSecond(ZoneOffset.UTC);System.out.println(epochSecond);
}

测试:

ExecutorService executorService = Executors.newFixedThreadPool(500);
@Test
void idTest() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; i++) {redisIdWorker.nextId("order");}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {executorService.submit(task);}// 必须阻塞 如果不阻塞,则程序结束时,我们的任务还没有结束latch.await();long end = System.currentTimeMillis();System.out.println(end - begin);
}

收获的小技巧:

  1. LocalDateTime的使用

    // 使用of可以快速获取指定时间的LocalDateTime对象
    LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
    // 使用toEpochSecond可以快速获取相应时间的时间戳
    long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
    // 使用format方法快速格式化时间
    LocalDateTime now = LocalDateTime.now();
    String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    
  2. 并发包的小技巧

    // 快速创建线程池,但是阿里的Java开发规范不建议这样做
    ExecutorService executorService = Executors.newFixedThreadPool(500);
    // 快速创建一个任务,使用lambda表达式
    Runnable task = () -> {……
    };
    // 提交任务
    executorService.submit(task);
    
  3. 位运算

    更好的利用bit位来处理信息

小结

在这里插入图片描述

uuid效率较低,且不满足自增性。

Redis自增,就是刚刚的方案,很常用。

雪花算法,类似于刚刚的方案,区别在于雪花算法的最后32位是以机器时间为基准,Redis是自增。

数据库自增,也就是数据库版本的Redis方案,效率不如Redis。

二、实现优惠券秒杀下单

按照流程图,实现非常简单,但会出现非常多的问题(详见后文)。

小收获:

使用lambdaUpdate

// 减库存操作
lambdaUpdate().setSql("stock = stock -1").eq(SeckillVoucher::getVoucherId, voucherId).update();

三、超卖问题

3.1 问题描述

在刚刚的流程中,是有很多问题隐患的,比如,经典的超卖问题

在并发量较高的情况下,假如剩余最后一张票,在购买最后一张票的线程还未扣减库存时,其他线程进入,查询库存,都会查到还有余票的结果,此时再进行库存判断,并扣减库存,就会发生超卖。

在这里插入图片描述

如何解决超卖问题?

最简单的方法就是上锁,同一时间只让一个线程去执行查库存->扣减库存的操作。

3.2 乐观锁和悲观锁

加锁的话有两种锁可以加:

悲观锁:比较悲观,认为线程安全问题一定会发生,因此操作数据前先获取锁,确保线程串行执行。(如synchronized、Lock)

乐观锁:比较乐观,认为线程安全问题不一定发生,所以不加锁,而是在更新数据时去判断数据有没有被其他线程修改。若没有则更新,反之重试或异常。

3.3 乐观锁实现

悲观锁的解决方案较为简单,即在可能出问题的代码上加synchronized或Lock锁住。我们介绍一下乐观锁的解决方案:

常见两种方法:

3.3.1 版本号法

每次更新数据时,更新版本号,并判断版本号是否被修改。

在这里插入图片描述

3.3.2 CAS(Compare And Swap)

以数据本身为版本号

3.4 小结

秒杀场景下,悲观锁性能较差,乐观锁成功率较低。

小优化:更新库存时,不用判断库存是否和查询到的库存相同,只需要库存大于零即可,这样成功率大大提高。

在这里插入图片描述

四、一人一单

4.1 添加功能

某些业务我们需要用户只能购买一单,于是我们多加一个用户是否已经购买的判断:

在这里插入图片描述

从ThreadLocal中取出当前用户的id,根据用户和商品id查询用户是否已经有订单存在,若存在则失败。

在这里插入图片描述

4.2 并发问题

按照刚刚的业务流程,正常用户单线程操作的情况下是没有问题的,但是若是恶意用户,同时开多个线程访问我们的接口,则可能出现这样一种情况:

若多个线程同时进入该方法,在任一线程未保存订单前,都进行查询用户是否下单,得到的count都是0,拿到这个0后,再去判断是否一人一单则都会成功,都会下单,造成了一人多单的问题。

解决:给该方法加锁,因为涉及两个表的写操作,我们添加事务。

在这里插入图片描述

写完过后以看,这不成了悲观锁嘛!里面的乐观锁还拿来干嘛呢?

经过思考:我们只需要限制单个用户的并发操作,只需要锁住userId即可,于是我们不再锁整个方法。

该方法改进为:

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {Long userid = UserHolder.getUser().getId();synchronized(userid.toString()) {......return Result.ok(voucherOrder.getId());   }
}

4.3 字符串问题

接着我们会发现锁不住,因为我们toString方法会创建一个新的字符串,不同的线程的userid.toString()出来的字符串不是同一个对象。

解决:使用intern方法

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {......return Result.ok(voucherOrder.getId());   }
}

使用intern方法,该方法会从JVM常量池中找equals为true的字符串,意味着不同的线程都是同一个字符串。

4.4 Spring事务问题

spring的声明式事务是基于AOP实现的,意味着方法结束后才会提交事务。

4.4.1 锁在事务内

当synchronized在该方法内时,会出现这么一种情况:锁释放了,但事务未提交,事务未提交时,下一个线程进来,读到的是数据库未修改的快照,仍然会发生一人多单的问题,所以我们要在调用该方法处添加锁,或是使用编程式事务。

synchronized (userid.toString().intern()) {createVoucherOrder(voucherId);
}

4.4.2 事务不生效

正是因为spring事务是通过aop实现,而aop是通过动态代理实现。当我们直接调用该方法时,默认是this.createVoucherOrder(voucherId),使用的是this调用,而不是使用代理对象来调用,只有通过代理对象来调用,事务才会生效。

  1. 引入依赖

    <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
    </dependency>
    
  2. 暴露代理对象

    启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解

    @MapperScan("com.hmdp.mapper")
    @EnableAspectJAutoProxy(exposeProxy = true)
    @SpringBootApplication
    public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}
    }
    
  3. 获取代理对象,并使用代理对象调用方法

    synchronized (userid.toString().intern()) {// 确保事务生效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();proxy.createVoucherOrder(voucherId);
    }
    

4.5 集群模式下的一人一单问题

通过刚刚的几波操作,可以说是基本解决了单机部署情况下的一人一单问题,若是集群部署,仍然会有问题。

可以开两个服务debug一下:

(小坑:Intellij Idea 2022.3.1版本,断点调试打断点后,需要等待断点上的“√”出现才能启动服务,不然断点无效,多个服务建议多选并同时启动,不然要等上一个服务启动到“√”出现才能启动下一个服务。)

在这里插入图片描述

我们的 锁 锁的是userid,锁的是jvm常量池中的userid字符串。若是多个jvm,不是同一个常量池,自然就锁不住了。

五、分布式锁

解决集群情况下的并发问题,则需要一个多个进程能都使用的锁。

5.1 分布式锁概述

分布式锁:满足分布式系统或集群模式下多进程可见并互斥的锁。

在这里插入图片描述

分布式锁可以借助第三方中间件简单的实现:

在这里插入图片描述

5.2 基于redis实现分布式锁

通过redis的setnx命令可以轻松实现简单的互斥锁,因为redis是单线程的,不会存在并发问题。

在这里插入图片描述

通过set key value nx ex 10 来保证原子性,防止在设置ttl值前宕机,发生死锁。这是一种非阻塞的锁,拿不到锁立即返回false,没有重试机制。

5.2.1 锁误删

若只是简单的使用setnx和del来获取和释放锁,则会出现一些问题,例如,拿到锁的线程业务阻塞了,它的锁超时释放了,此时另一个线程拿到锁,在另一个线程还未释放锁时,它的业务完成,并删掉了另一个线程的锁。这样一来,第三个线程又能拿到锁。

在这里插入图片描述

所以我们需要有一种机制,防止锁的误删:

在这里插入图片描述

在释放锁时,再判断一下锁是否是自己的。

具体方案:在设置该锁的value时,我们使用threadid,因为在多个JVM中,threadid有可能相同,于是我们在threadid前拼接一个UUID。这样就保证了误删锁的情况不会再发生。

5.2.2 Lua脚本保证命令原子性

避免锁的误删后,我们的分布式锁就比较健壮了。但在某些极端场景下,仍会出现误删问题:

在释放锁时,会先判断是否是当前的锁(在redis中获取该锁的value),此时,因为一些原因(如GC)阻塞了,下一个线程拿到锁,此时阻塞结束,由于已经做过判断,上一个线程会把下一个线程拿到的锁误删,此时第三个线程又能拿到锁了。

在这里插入图片描述

解决方案:将判断和释放锁封装成一个原子操作。

使用Lua脚本可以完美解决这个问题。

在lua脚本中,可以直接使用redis.call()方法来编写redis命令,在redis中使用EVAL调用。有如下好处:

  1. 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延。
  2. 原子操作。Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。
  3. 复用。客户端发送的脚本会永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。

在这里插入图片描述

注意:lua语言中,数组下标是从1开始。

了解lua脚本后,我们就可以开始编写lua脚本了。

在这里插入图片描述

根据需求:

在这里插入图片描述

简写:

在这里插入图片描述

接下来就是使用Java的redis客户端嗲用Lua脚本:

使用execute方法:

在这里插入图片描述

于是,我们写出了简单且健壮的分布式锁:

package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {private StringRedisTemplate stringRedisTemplate;// Lua脚本private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;// 锁的名字 key一般为业务名private String key;// 锁前缀private static final String KEY_PREFIX = "lock:";// 该区分不同的JVMprivate static final String ID_PREFIX = UUID.randomUUID().toString(true) + "";// 初始化Lua脚本static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}// 初始化keypublic SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.key = KEY_PREFIX + name;}/*** @param timeoutSec 锁持有的时长 过期自动释放 * @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId + "", timeoutSec, TimeUnit.SECONDS);return BooleanUtil.isTrue(success);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(key),ID_PREFIX + Thread.currentThread().getId());}
}

5.3 小结

在这里插入图片描述

编码收获:

  1. 读取文件,若该文件是不变的,则在static代码块中初始化一次就好了。
  2. ClassPathResource()类快速获取Resource目录下的资源。

六、Redisson

现在我们基于String类型的setnx实现的分布式锁已经足够健壮,但仍有不足:

在这里插入图片描述

要解决这些问题,我们的基础版redis分布式锁就不好发力了。

但Redisson轻松解决:

在这里插入图片描述

Redisson中不仅有分布式锁,分布式锁只是它的一个子集 。

6.1 使用Redisson分布式锁

  1. 引依赖

    <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.19.3</version>
    </dependency>
    
  2. 配置

    如果使用yaml来配置,则会将redis的配置覆盖。我们使用redisson只是作为分布式锁使用,所以单独配置:

    @Configuration
    public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();// 单点配置,也可以用useClusterServers添加集群地址config.useSingleServer().setAddress("redis://192.168.0.122:6379").setPassword("123456");return Redisson.create(config);}
    }
    
  3. 使用

    private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userid = voucherOrder.getUserId();// 创建锁对象RLock lock = redissonClient.getLock("order:" + userid);// 尝试获取锁boolean isLock = lock.tryLock();// 是否拿到锁if (!isLock) {log.error("不能重复下单!");}try {proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}
    }
    

    非常简单。

    也可以携带参数

在这里插入图片描述

6.2 Redisson分布式锁原理

redisson的分布式锁是如何解决这四个问题?

6.2.1 可重入原理

我们的基础版锁是不可重入的

redisson使用Hash结构轻松解决了这个问题

在这里插入图片描述

每重入一次,value+1,每次释放锁,value-1,value为0,则释放锁。非常巧妙

其底层都是lua脚本,保证命令原子性:

获取锁:

在这里插入图片描述

释放锁:

在这里插入图片描述

源码中lua脚本是写死了放在代码中的,可以去查看,和这些大差不差,释放锁的时候会发布一个释放信号(后文)。

6.2.2 可重试原理

我们基础版的redis是一个非阻塞式的锁,没有任何重试机制。redisson用PubSub模式解决了这个问题。

我们从redis源码中探究:

/*** 参数分别是等待时间,锁ttl,时间单位。我们不指定ttl时,默认为-1。  **/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();// 执行获取锁的方法,会返回锁的ttl,获取锁失败会返回nullLong ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 成功获取锁if (ttl == null) {return true;}// 获取失败,准备重试,先检查重试等待时间是否还有剩余time -= System.currentTimeMillis() - current;// 过期,获取锁失败,返回falseif (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 还可以重试,准备重试current = System.currentTimeMillis();// 订阅锁的释放信号CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);try {// 等待锁释放,获取信号subscribeFuture.get(time, TimeUnit.MILLISECONDS);// 超时 失败} catch (TimeoutException e) {if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. " +"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {subscribeFuture.whenComplete((res, ex) -> {if (ex == null) {unsubscribe(res, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;// 异常 失败} catch (ExecutionException e) {acquireFailed(waitTime, unit, threadId);return false;}// 获取锁释放信号成功try {// 再次判断等待时间是否过期time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 没有过期 尝试获取锁while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// 获取锁成功if (ttl == null) {return true;}// 失败 检查等待时间是否过期time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// 未过期 准备重试 currentTime = System.currentTimeMillis();// 等待释放锁的信号if (ttl >= 0 && ttl < time) {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}// 获得锁释放信号 再次检查等待时间time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {// 无论获取锁成功或失败,都要解除订阅unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);}//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

相比我们基础版redis分布式锁,有很多优点,例如:

  • 重试并不是无休止的重试,而是使用发布订阅模式,等待一个锁的释放信号再去重试,节约内存,性能。
  • 非常严谨,每次获取锁前,都要先判断重试等待时间是否过期。

6.2.3 解决超时释放

虽然锁超时释放可以解决业务异常带来的死锁问题,但是业务超时引发的自动释放也会产生线程安全问题。redisson使用看门狗机制解决了这个问题。

从redis源码中探究:

首先看获取锁的源码

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 参数和之前一致RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 默认情况 leaseTime为 -1 ,这个时候,redisson使用了一个变量internalLockLeaseTime,实际上是30 * 1000毫秒,也就是30秒// 该方法会执行lua脚本,尝试获取锁,若获取成功,返回null,获取失败,返回ttlttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 判断是否获取成功if (ttlRemaining == null) {// 获取成功 刚才提到 默认情况下,leaseTime为 -1 所以这里走else分支if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 续约 该方法就是解决超时释放的关键scheduleExpirationRenewal(threadId);}}// 返回lua脚本返回的ttl或是nilreturn ttlRemaining;});return new CompletableFutureWrapper<>(f);
}

再来看最关键的续约的方法

protected void scheduleExpirationRenewal(long threadId) {// 用来存线程对应的锁ExpirationEntry entry = new ExpirationEntry();// 如果是重入的锁,则无法放入这个map,放入这个map的锁才有资格续约,会获取一个旧的entry,意思就是每一把锁在这个map中有且仅有一条记录ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 判断是否是重入的锁if (oldEntry != null) {// 是重入的锁,只做记录oldEntry.addThreadId(threadId);} else {// 不是重入的锁,记录并开始续约任务entry.addThreadId(threadId);try {// 续约 核心方法renewExpiration();} finally {// 线程终止,取消续约任务if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}
}

看看redisson是如何巧妙地续约的:

private void renewExpiration() {// 获取当前锁的信息ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());// 没有记录 不用续约if (ee == null) {return;}// 定时任务续约Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// 检查是否需要续约ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 执行lua脚本续约,每次执行,都会重置有效期为30秒,续约成功返回ture 续约失败返回falseCompletionStage<Boolean> future = renewExpirationAsync(threadId);future.whenComplete((res, e) -> {// 异常 移除续约mapif (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 续约成功 继续续约if (res) {renewExpiration();} else {// 失败 取消定时任务 可以通过EXPIRATION_RENEWAL_MAP.get(getEntryName())快速获取定时任务并取消掉cancelExpirationRenewal(null);}});}// 每隔10秒重置一次}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

redisson相比我们的基础版redis,利用看门狗机制,解决了因为业务阻塞而导致锁超时释放的安全问题。有以下优点:

  • 维护一个可以存锁信息和定时任务的EXPIRATION_RENEWAL_MAP,可以快速的找到对应的锁,进行续约或是取消续约操作。
  • 每10秒钟重置一次,若业务没有完成就无限重置,不会因为业务阻塞而超时释放,若是业务异常或宕机,则会马上取消掉定时任务,让锁超时释放,避免导致死锁。

6.2.4 保证主从一致

普通redis分布式锁:

在这里插入图片描述

假如在未同步时,主节点故障,从节点成为主节点后,数据不一致,锁丢失了,有线程安全隐患。

相比普通的redis分布式锁,redisson是如何保证主从一致性的呢?

redisson让每一个redis都是主节点 ,并在这些节点后面跟上从节点

在这里插入图片描述

在这里插入图片描述

即使某个节点宕机,造成了数据不一致,锁也不会失效,因为redisson的连锁,需要在每一个节点上都能获取锁,才算成功。

6.2.5 小结

单体redisson原理:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

七、redis优化秒杀

之前的业务流程,我们是按顺序同步执行的,其中包含了很多数据库的读写操作,并且还花了很多时间来保证线程安全,现在我们使用lua脚本保证一部分的线程安全(例如超卖问题就不用考虑了),再用java的阻塞队列来让耗时操作异步执行。

在这里插入图片描述

redis执行lua脚本是单线程的,所以线程安全。

我们需要做几件事:

  • 新增秒杀优惠券的同时,将优惠券库存存入redis。
  • 利用lua脚本判断库存,一人一单。
  • 如果有购买资格,则生成订单id返回,并将用户id等订单信息传入阻塞队列。
  • 开启线程任务,不断从阻塞队列中获取信息,操作数据库,保存订单。

做完后,会有一些问题:

在这里插入图片描述

由于阻塞队列是javautil包下的,占用的是jvm的内存,会受jvm内存的限制。

当阻塞队列中还有未处理订单,或是处理订单异常等订单丢失的情况时,会导致数据不一致。

八、Redis消息队列实现异步秒杀

为了解决以上问题,我们引入第三方的消息队列。好处就是不会受JVM内存限制。当然redis消息队列不好玩,一般都是用专门的mq中间件(rabbitmq,rocketmq,kafka等)。

redis提供了三种不同的实现消息队列的方式:

  1. list:基于list结构模拟消息队列。
  2. PubSub:发布订阅模式,基本的点对点消息模型。
  3. stream:比较完善的消息队列模型。(功能强大但复杂的一批)

在这里插入图片描述

8.1 基于List结构模拟消息队列

就是利用LPUSH、RPOP这些命令,只要出入口不一致即可。但要模拟阻塞队列,就需要LPUSH、BRPOP这些阻塞的命令了。

8.2 PubSub发布订阅模式

在这里插入图片描述

在这里插入图片描述

8.3 基于Stream结构的消息队列

8.3.1 基本使用

发消息:

在这里插入图片描述

读消息:

阻塞地读消息:

在这里插入图片描述

这就是stream结构地基本使用。

问题:

我们是使用ID来指定读取地消息,当ID为0时,代表从第一个消息开始读,当ID为“$”时,代表读取最新消息,若在处理某条消息时,有n(n>1)条消息进入队列,当那条消息处理完,下次读取只会读取最新地一条,会漏掉中间地消息,造成消息漏读。

8.3.2 消费者组

在这里插入图片描述

消费者组不仅可以加快消息地处理速度,还有一定的数据保护措施。但不得不吐槽,真的麻烦,想要保证数据安全,有时候不得不手动ACK。

创建消费者组:

在这里插入图片描述

读消息:

在这里插入图片描述

这里比较特别的是ID可以取值为“>”。

8.3.3 改造秒杀业务

// 在类初始化完毕后,就开始监听队列
@PostConstruct
public void init() {Runnable task = () -> {String queueName = "stream.orders";while (true) {try {// 1. 获取消息队列中的订单消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2. 判断消息是否获取成功if (list == null || list.isEmpty()) {// 2.1 获取失败 下一轮循环continue;}// 获取消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);// 3. 获取成功 下单handleVoucherOrder(voucherOrder);// 4. ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理消息异常", e);// 异常,从pending-list中取出消息重试handlePendingList();}}};SECKILL_ORDER_EXECUTOR.submit(task);
}

异常消息重试:

private void handlePendingList() {String queueName = "stream.orders";while (true) {try {// 1. 获取pending-list中的订单消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2. 判断消息是否获取成功if (list == null || list.isEmpty()) {// 2.1 获取失败 下一轮循环break;}// 获取消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);// 3. 获取成功 下单handleVoucherOrder(voucherOrder);// 4. ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list消息异常", e);}}
}

8.3.4 小结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74290.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DevOps实战50讲-(1)彻底理解DevOps

持续坚持原创输出&#xff0c;点击蓝字关注我吧软件质量保障:所寫即所思&#xff5c;一个阿里质量人对测试的所感所悟。浅谈软件开发流程软件开发流程是从需求分析、设计、编码、测试到上线等一系列环节的步骤和活动。通常来说&#xff0c;软件开发流程可以分为以下几个阶段&am…

Python 多进程多线程线程池进程池协程

目录 一、线程与进程很简单的介绍 1.1 线程与进程的区别 二、多进程Process 2.1 多进程与多线程的区别 2.2 多进程为啥要使用队列 2.3 控制进程运行顺序 2.3.1 join &#xff0c; 2.3.1 daemon 守护进程 2.4 进程id 2.5 进程 存活状态is_alive() 2.5 实现自定义多…

计算机图形学:liang算法和Cyrus-Beck算法

其中Cyrus-Beck算法呢&#xff0c;是计算一根直线一个多边形的交线段&#xff1b;liang算法是Cyrus的一个特例&#xff0c;即多边形刚好是矩形&#xff1b;先看看Cyrus算法的思路【从别的博客找的图片】&#xff1a;这很容易理解&#xff0c;点积>0时就可能中内部嘛&#xf…

Pyinstaller 打包EXE(七) 百篇文章学PyQT

本文章是百篇文章学PyQT6的第七篇&#xff0c;本文讲述如何使用Pyinstaller打包UI界面和代码&#xff0c;将程序打包成EXE来更为方便的进行部署&#xff0c;在写博客和学习的过程中会遇到很多问题&#xff0c;例如&#xff1a;PyQT6在网上很多博客都是PyQT5、或者PyQT4大部分都…

Amazon S3 服务15岁生日快乐!

2021年3月14日&#xff0c;作为第一个发布的服务&#xff0c;Amazon S3 服务15周岁啦&#xff01;在中国文化里&#xff0c;15岁是个临界点&#xff0c;是从“舞勺之年”到“舞象之年”的过渡。相信对于 Amazon S3 和其他的云服务15周岁也将是其迎接更加美好未来的全新起点。亚…

java面试题-JVM内存结构

整体结构&#xff1a;1.说说JVM内存整体的结构&#xff1f;线程私有还是共享的&#xff1f;JVM&#xff08;Java Virtual Machine&#xff09;内存可以分为以下几个部分&#xff1a;程序计数器&#xff08;Program Counter Register&#xff09;&#xff1a;是线程私有的&#…

深入浅出解析ChatGPT引领的科技浪潮【AI行研商业价值分析】

Rocky Ding写在前面 【AI行研&商业价值分析】栏目专注于分享AI行业中最新热点/风口的思考与判断。也欢迎大家提出宝贵的意见或优化ideas&#xff0c;一起交流学习&#x1f4aa; 大家好&#xff0c;我是Rocky。 2022年底&#xff0c;ChatGPT横空出世&#xff0c;火爆全网&a…

Linux学习(8.6)文件与目录的默认权限与隐藏权限

目录 文件与目录的默认权限与隐藏权限 文件的默认权限&#xff1a;umask chattr (配置文件隐藏属性) lsattr (显示文件隐藏属性) 文件特殊权限&#xff1a; SUID, SGID, SBIT 观察文件类型&#xff1a;file 以下内容转载自鸟哥的Linux私房菜 文件与目录的默认权限与隐藏权…

【架构师】零基础到精通——架构发展

博客昵称&#xff1a;架构师Cool 最喜欢的座右铭&#xff1a;一以贯之的努力&#xff0c;不得懈怠的人生。 作者简介&#xff1a;一名Coder&#xff0c;软件设计师/鸿蒙高级工程师认证&#xff0c;在备战高级架构师/系统分析师&#xff0c;欢迎关注小弟&#xff01; 博主小留言…

【20230225】【剑指1】分治算法(中等)

1.重建二叉树class Solution { public:TreeNode* traversal(vector<int>& preorder,vector<int>& inorder){if(preorder.size()0) return NULL;int rootValuepreorder.front();TreeNode* rootnew TreeNode(rootValue);//int rootValuepreorder[0];if(preo…

redis秒杀

redis优惠券秒杀 为什么订单表订单ID不采用自增长&#xff1f; id规律性太明显&#xff0c;容易被用户猜测到&#xff08;比如第一天下订单id10&#xff0c;第二天下订单id100&#xff0c;在昨天的1天内只卖出90商品&#xff09;受单表数据量限制&#xff08;订单数据量大&am…

从零开始学习iftop流量监控(找出服务器耗费流量最多的ip和端口)

一、iftop是什么iftop是类似于top的实时流量监控工具。作用&#xff1a;监控网卡的实时流量&#xff08;可以指定网段&#xff09;、反向解析IP、显示端口信息等官网&#xff1a;http://www.ex-parrot.com/~pdw/iftop/二、界面说明>代表发送数据&#xff0c;< 代表接收数…

chatGPT模型原理

文章目录简介BertGPT 初代GPT-2GPT-3chatGPT开源ChatGPT简介 openai 的 GPT 大模型的发展历程。 Bert 2018年&#xff0c;自然语言处理 NLP 领域也步入了 LLM 时代&#xff0c;谷歌出品的 Bert 模型横空出世&#xff0c;碾压了以往的所有模型&#xff0c;直接在各种NLP的建模…

EasyRecovery16MAC苹果版本Photo最新版数据恢复软件

无论是在工作学习中&#xff0c;还是在生活中&#xff0c;Word、Excle等办公软件都是大家很常用的。我们在使用电脑的过程中&#xff0c;有时会因自己的误删或电脑故障&#xff0c;从而导致我们所写的文档丢失了。出现这样的大家不要着急&#xff0c;今天小编就给大家推荐一款可…

FreeRTOS优先级翻转

优先级翻转优先级翻转&#xff1a;高优先级的任务反而慢执行&#xff0c;低优先级的任务反而优先执行优先级翻转在抢占式内核中是非常常见的&#xff0c;但是在实时操作系统中是不允许出现优先级翻转的&#xff0c;因为优先级翻转会破坏任务的预期顺序&#xff0c;可能会导致未…

YOLOv5模型学习记录

新年伊始&#xff0c;YOLOv8横空出世&#xff0c;这个还未开源时便引发界内广泛热议的目标检测算法&#xff0c;一经问世便再次引发热潮&#xff0c;而作为与其师出同源的YOLOv5&#xff0c;自然要拿来与其比较一番。接下来我们便来学习一下吧。 模型结构 首先便是模型结构了…

Rasa 3.x 学习系列-摆脱意图:一种新的对话模式

Rasa 3.x 学习系列-摆脱意图:一种新的对话模式 在2019年的一篇文章中,Alan Nichol写道 :是时候摆脱意图了。一年后,Rasa发布了Rasa中的第一个无意图(或“端到端”)对话模型。现在,我们宣布迈出了一个重要的步伐,将LLM的强大功能带入Rasa的对话管理中。 首先,意图非常…

ACSC 2023 比赛复现

Admin Dashboard 在 index.php 中可以看到需要访问者是 admin 权限&#xff0c;才可以看到 flag。 report.php 中可以让 admin bot 访问我们输入的 url&#xff0c;那么也就是说可以访问 addadmin.php 添加用户。 在 addadmin.php 中可以添加 admin 用户&#xff0c;但是需…

李宏毅2023春季机器学习课程

目录2021&2022课程重磅须知我维护的其他项目更新日志课程地址课程资料直链课程作业直链其他优质课程2021&2022课程 CSDN Github 重磅须知 为方便所有网课资料与优质电子书籍的实时更新维护&#xff0c;创建一个在线实时网盘文件夹&#xff1b;   网盘获取方式&#…

mindspore的MLP模型(多层感知机)

导入模块 import hashlib import os import tarfile import zipfile import requests import numpy as np import pandas as pd import mindspore import mindspore.dataset as ds from mindspore import nn import mindspore.ops as ops import mindspore.numpy as mnp from …