一、前言

在使用Redis时,在高并发场景下会出现一些问题,常见的问题有:缓存击穿、缓存雪崩、缓存穿透,这三个问题也是面试时的高频问题。

二、缓存击穿

1、说明

缓存击穿是指,某一热点数据存储到redis中,该数据处于高并发场景下,如果此时该key过期失效,这样就会有大量的并发请求进入到数据库,对数据库产生大的压力,甚至会压垮数据库。

2、解决方案

针对于缓存击穿这种情况,常见的解决方案有两种:

  • 热数据不设置过期时间
  • 使用互斥锁,可以使用redisson的分布式锁实现,就是从redis中查询不到数据时,不要立刻去查数据库,而是先获取锁,获取到锁后再去查询数据库,而其他未获取到锁的请求进行重试,这样就可以确保只有一个查询数据库并且更新缓存的请求。

1-缓存击穿.png

三、缓存雪崩

1、说明

缓存雪崩的情况往往是由两种情况产生:

  • 情况1:由于大量 key 设置了相同的过期时间(数据在缓存和数据库都存在),一旦到达过期时间点,这些 key 集体失效,造成访问这些 key 的请求全部进入数据库。
  • 情况2:Redis 实例宕机,大量请求进入数据库

2、解决方案

针对于雪崩问题,可以分情况进行解决:

  • 情况1的解决方案
    • 错开过期时间:在过期时间上加上随机值(比如 1~5 分钟)
    • 服务降级:暂停非核心数据查询缓存,返回预定义信息(错误页面,空值等)
  • 情况2的解决方案
    • 事前预防:搭建高可用集群
    • 构建多级缓存,实现成本稍高
    • 熔断:通过监控一旦雪崩出现,暂停缓存访问待实例恢复,返回预定义信息(有损方案)
    • 限流:通过监控一旦发现数据库访问量超过阈值,限制访问数据库的请求数(有损方案)

3、实现

我们将针对【情况1】的解决方案进行实现,主要是在默认的时间基础上随机增加1-10分钟有效期时间。
需要注意的是,使用SpringCache的@Cacheable注解是无法指定有效时间的,所以需要自定义RedisCacheManager对有效期时间进行随机设置。
自定义RedisCacheManager:


/**
 * 自定义CacheManager,用于设置不同的过期时间,防止雪崩问题的发生
 */
public class MyRedisCacheManager extends RedisCacheManager {

    public MyRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
        super(cacheWriter, defaultCacheConfiguration);
    }

    @Override
    protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {
        //获取到原有过期时间
        Duration duration = cacheConfig.getTtl();
        if (ObjectUtil.isNotEmpty(duration)) {
            //在原有时间上随机增加1~10分钟
            Duration newDuration = duration.plusMinutes(RandomUtil.randomInt(1, 11));
            cacheConfig = cacheConfig.entryTtl(newDuration);
        }
        return super.createRedisCache(name, cacheConfig);
    }
}

  • 使用MyRedisCacheManager:
    @Bean
    public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
        // 默认配置
        RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                // 设置key的序列化方式为字符串
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
                // 设置value的序列化方式为json格式
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
                .disableCachingNullValues() // 不缓存null
                .entryTtl(Duration.ofHours(redisTtl));  // 默认缓存数据保存1小时

        // 构redis缓存管理器
        // RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
        //         .fromConnectionFactory(redisTemplate.getConnectionFactory())
        //         .cacheDefaults(defaultCacheConfiguration)
        //         .transactionAware()
        //         .build();

        //使用自定义缓存管理器
        RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
        MyRedisCacheManager myRedisCacheManager = new MyRedisCacheManager(redisCacheWriter, defaultCacheConfiguration);
        myRedisCacheManager.setTransactionAware(true); // 只在事务成功提交后才会进行缓存的put/evict操作
        return myRedisCacheManager;
    }

四、缓存穿透

1、说明

缓存穿透是指,如果一个 key 在缓存和数据库都不存在,那么访问这个 key 每次都会进入数据库

  • 很可能被恶意请求利用
  • 缓存雪崩与缓存击穿都是数据库中有,但缓存暂时缺失
  • 缓存雪崩与缓存击穿都能自然恢复,但缓存穿透则不能

2、解决方案

针对缓存穿透,一般有两种解决方案,分别是:

  • 如果数据库没有,也将此不存在的 key 关联 null 值放入缓存,缺点是这样的 key 没有任何业务作用,白占空间
  • 采用BloomFilter(布隆过滤器)解决,基本思路就是将存在数据的哈希值存储到一个足够大的Bitmap(Bit为单位存储数据,可以大大节省存储空间)中,在查询redis时,先查询布隆过滤器,如果数据不存在直接返回即可,如果存在的话,再执行缓存中命中、数据库查询等操作。

3、布隆过滤器

布隆过滤器(Bloom Filter)是1970年由布隆提出的,它实际上是一个很长的二进制向量和一系列随机映射函数,既然是二进制,那存储的数据不是0就是1,默认是0。
可以把它看作是这样的:

2-布隆过滤器.png

需要将数据存入隆过滤器中,才能判断是否存在,存入时要通过哈希算法计算数据的哈希值,通过哈希值确定存储都哪个位置。如下:

3-布隆过滤器.png

说明:数据hello通过哈希算法计算哈希值,假设得到的值为8,这个值就是存储到布隆过滤器下标值。

如何判断数据存在或者不存在呢?和存储道理一样,假设判断【java】数据是否存在,首先通过哈希算法计算哈希值,通过下标判断值是0还是1,如果是0就不存在,1就存在。

看到这里,你一定会有这样的疑问,不同的数据经过哈希算法计算,可能会得到相同的值,也就是,【张三】和【王五】可能会得到相同的hash值,会在同一个位置标记为1,这样的话,1个位置可能会代表多个数据,也就是会出现误判,没错,这个就是布隆过滤器最大的一个缺点,也是不可避免的特性。正因为这个特性,所以布隆过滤器基本是不能做删除动作的。

这里可以得出一个结论,使用布隆过滤器能够判断一定不存在,而不能用来判断一定存在。
布隆过滤器虽然不能完全避免误判,但是可以降低误判率,如何降低误判率呢?就是增加多个哈希算法,计算多个hash值,因为不同的值,经过多个哈希算法计算得到相同值的概率要低一些。

4-布隆过滤器.png

说明:可以看到,【hello】值经过3个哈希算法(实际不止3个)会计算出3个值,分别以这些值为坐标,标记数据为1,当判断值存在时,同样要经过这3个哈希算法计算3个哈希值,对应的都为1说明数据可能存在,如果其中有一个为0,就说明数据一定不存在。
在这里也能看出布隆过滤器的另外一个特性,哈希算法越多,误判率越低,但是所占用的空间越多,查询效率将越低。

总结下布隆过滤器的优缺点:

  • 优点
    • 存储的二进制数据,1或0,不存储真实数据,空间占用比较小且安全。
    • 插入和查询速度非常快,因为是基于数组下标的,类似HashMap,其时间复杂度是O(K),其中k是指哈希算法个数。
  • 缺点
    • 存在误判,可以通过增加哈希算法个数降低误判率,不能完全避免误判。
    • 删除困难,因为一个位置可能会代表多个值,不能做删除。

牢记结论:布隆过滤器能够判断一定不存在,而不能用来判断一定存在。

4、实现

关于布隆过滤器的使用,建议使用Google的Guava 或 Redission基于Redis实现,前者是在单体架构下比较适合,后者更适合在分布式场景下,便于多个服务节点之间共享。

Redission基于Redis,使用string类型数据,生成二进制数组进行存储,最大可用长度为:4294967294。

  • 引入Redission依赖:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
</dependency>
  • Redission配置

@Configuration
public class RedissonConfiguration {

    @Resource
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonSingle() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
        if (null != (redisProperties.getTimeout())) {
            serverConfig.setTimeout(1000 * Convert.toInt(redisProperties.getTimeout().getSeconds()));
        }
        if (StrUtil.isNotEmpty(redisProperties.getPassword())) {
            serverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }

}

  • 自定义布隆过滤器配置:
package com.sl.transport.info.config;

import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * 布隆过滤器相关配置
 */
@Getter
@Configuration
public class BloomFilterConfig {

    /**
     * 名称,默认:sl-bloom-filter
     */
    @Value("${bloom.name:sl-bloom-filter}")
    private String name;

    /**
     * 布隆过滤器长度,最大支持Integer.MAX_VALUE*2,即:4294967294,默认:1千万
     */
    @Value("${bloom.expectedInsertions:10000000}")
    private long expectedInsertions;

    /**
     * 误判率,默认:0.05
     */
    @Value("${bloom.falseProbability:0.05d}")
    private double falseProbability;

}
  • 定义BloomFilterService接口:

/**
 * 布隆过滤器服务
 */
public interface BloomFilterService {

    /**
     * 初始化布隆过滤器
     */
    void init();

    /**
     * 向布隆过滤器中添加数据
     *
     * @param obj 待添加的数据
     * @return 是否成功
     */
    boolean add(Object obj);

    /**
     * 判断数据是否存在
     *
     * @param obj 数据
     * @return 是否存在
     */
    boolean contains(Object obj);

}

  • 编写实现类


@Service
public class BloomFilterServiceImpl implements BloomFilterService {

    @Resource
    private RedissonClient redissonClient;
    @Resource
    private BloomFilterConfig bloomFilterConfig;

    private RBloomFilter<Object> getBloomFilter() {
        return this.redissonClient.getBloomFilter(this.bloomFilterConfig.getName());
    }

    @Override
    @PostConstruct // spring启动后进行初始化
    public void init() {
        RBloomFilter<Object> bloomFilter = this.getBloomFilter();
        bloomFilter.tryInit(this.bloomFilterConfig.getExpectedInsertions(), this.bloomFilterConfig.getFalseProbability());
    }

    @Override
    public boolean add(Object obj) {
        return this.getBloomFilter().add(obj);
    }

    @Override
    public boolean contains(Object obj) {
        return this.getBloomFilter().contains(obj);
    }
}

  • 优化查询方法
    改造TransportInfoController的查询逻辑,如果布隆过滤器中不存在直接返回即可,无需进行缓存命中。
  • 注意,在新增数据时先要调用布隆过滤器新增方法,才能保证在查询时去布隆过滤器中先查找有效
    @ApiImplicitParams({
            @ApiImplicitParam(name = "transportOrderId", value = "运单id")
    })
    @GetMapping("{transportOrderId}")
    public TransportInfoDTO queryByTransportOrderId(@PathVariable("transportOrderId") String transportOrderId) {
        //如果布隆过滤器中不存在,无需缓存命中,直接返回即可
        boolean contains = this.bloomFilterService.contains(transportOrderId);
        if (!contains) {
            throw new SLException(ExceptionEnum.NOT_FOUND);
        }
        TransportInfoDTO transportInfoDTO = transportInfoCache.get(transportOrderId, id -> {
            //未命中,查询MongoDB
            TransportInfoEntity transportInfoEntity = this.transportInfoService.queryByTransportOrderId(id);
            //转化成DTO
            return BeanUtil.toBean(transportInfoEntity, TransportInfoDTO.class);
        });

        if (ObjectUtil.isNotEmpty(transportInfoDTO)) {
            return transportInfoDTO;
        }
        throw new SLException(ExceptionEnum.NOT_FOUND);
    }