Redis缓存问题
一、前言
在使用Redis时,在高并发场景下会出现一些问题,常见的问题有:缓存击穿、缓存雪崩、缓存穿透,这三个问题也是面试时的高频问题。
二、缓存击穿
1、说明
缓存击穿是指,某一热点数据存储到redis中,该数据处于高并发场景下,如果此时该key过期失效,这样就会有大量的并发请求进入到数据库,对数据库产生大的压力,甚至会压垮数据库。
2、解决方案
针对于缓存击穿这种情况,常见的解决方案有两种:
- 热数据不设置过期时间
- 使用互斥锁,可以使用redisson的分布式锁实现,就是从redis中查询不到数据时,不要立刻去查数据库,而是先获取锁,获取到锁后再去查询数据库,而其他未获取到锁的请求进行重试,这样就可以确保只有一个查询数据库并且更新缓存的请求。

三、缓存雪崩
1、说明
缓存雪崩的情况往往是由两种情况产生:
- 情况1:由于大量 key 设置了相同的过期时间(数据在缓存和数据库都存在),一旦到达过期时间点,这些 key 集体失效,造成访问这些 key 的请求全部进入数据库。
- 情况2:Redis 实例宕机,大量请求进入数据库
2、解决方案
针对于雪崩问题,可以分情况进行解决:
- 情况1的解决方案
- 错开过期时间:在过期时间上加上随机值(比如 1~5 分钟)
- 服务降级:暂停非核心数据查询缓存,返回预定义信息(错误页面,空值等)
- 情况2的解决方案
- 事前预防:搭建高可用集群
- 构建多级缓存,实现成本稍高
- 熔断:通过监控一旦雪崩出现,暂停缓存访问待实例恢复,返回预定义信息(有损方案)
- 限流:通过监控一旦发现数据库访问量超过阈值,限制访问数据库的请求数(有损方案)
3、实现
我们将针对【情况1】的解决方案进行实现,主要是在默认的时间基础上随机增加1-10分钟有效期时间。
需要注意的是,使用SpringCache的@Cacheable注解是无法指定有效时间的,所以需要自定义RedisCacheManager对有效期时间进行随机设置。
自定义RedisCacheManager:
/**
* 自定义CacheManager,用于设置不同的过期时间,防止雪崩问题的发生
*/
public class MyRedisCacheManager extends RedisCacheManager {
public MyRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) {
super(cacheWriter, defaultCacheConfiguration);
}
@Override
protected RedisCache createRedisCache(String name, RedisCacheConfiguration cacheConfig) {
//获取到原有过期时间
Duration duration = cacheConfig.getTtl();
if (ObjectUtil.isNotEmpty(duration)) {
//在原有时间上随机增加1~10分钟
Duration newDuration = duration.plusMinutes(RandomUtil.randomInt(1, 11));
cacheConfig = cacheConfig.entryTtl(newDuration);
}
return super.createRedisCache(name, cacheConfig);
}
}
- 使用MyRedisCacheManager:
@Bean
public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
// 默认配置
RedisCacheConfiguration defaultCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
// 设置key的序列化方式为字符串
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
// 设置value的序列化方式为json格式
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues() // 不缓存null
.entryTtl(Duration.ofHours(redisTtl)); // 默认缓存数据保存1小时
// 构redis缓存管理器
// RedisCacheManager redisCacheManager = RedisCacheManager.RedisCacheManagerBuilder
// .fromConnectionFactory(redisTemplate.getConnectionFactory())
// .cacheDefaults(defaultCacheConfiguration)
// .transactionAware()
// .build();
//使用自定义缓存管理器
RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
MyRedisCacheManager myRedisCacheManager = new MyRedisCacheManager(redisCacheWriter, defaultCacheConfiguration);
myRedisCacheManager.setTransactionAware(true); // 只在事务成功提交后才会进行缓存的put/evict操作
return myRedisCacheManager;
}
四、缓存穿透
1、说明
缓存穿透是指,如果一个 key 在缓存和数据库都不存在,那么访问这个 key 每次都会进入数据库
- 很可能被恶意请求利用
- 缓存雪崩与缓存击穿都是数据库中有,但缓存暂时缺失
- 缓存雪崩与缓存击穿都能自然恢复,但缓存穿透则不能
2、解决方案
针对缓存穿透,一般有两种解决方案,分别是:
- 如果数据库没有,也将此不存在的 key 关联 null 值放入缓存,缺点是这样的 key 没有任何业务作用,白占空间
- 采用BloomFilter(布隆过滤器)解决,基本思路就是将存在数据的哈希值存储到一个足够大的Bitmap(Bit为单位存储数据,可以大大节省存储空间)中,在查询redis时,先查询布隆过滤器,如果数据不存在直接返回即可,如果存在的话,再执行缓存中命中、数据库查询等操作。
3、布隆过滤器
布隆过滤器(Bloom Filter)是1970年由布隆提出的,它实际上是一个很长的二进制向量和一系列随机映射函数,既然是二进制,那存储的数据不是0就是1,默认是0。
可以把它看作是这样的:

需要将数据存入隆过滤器中,才能判断是否存在,存入时要通过哈希算法计算数据的哈希值,通过哈希值确定存储都哪个位置。如下:

说明:数据hello通过哈希算法计算哈希值,假设得到的值为8,这个值就是存储到布隆过滤器下标值。
如何判断数据存在或者不存在呢?和存储道理一样,假设判断【java】数据是否存在,首先通过哈希算法计算哈希值,通过下标判断值是0还是1,如果是0就不存在,1就存在。
看到这里,你一定会有这样的疑问,不同的数据经过哈希算法计算,可能会得到相同的值,也就是,【张三】和【王五】可能会得到相同的hash值,会在同一个位置标记为1,这样的话,1个位置可能会代表多个数据,也就是会出现误判,没错,这个就是布隆过滤器最大的一个缺点,也是不可避免的特性。正因为这个特性,所以布隆过滤器基本是不能做删除动作的。
这里可以得出一个结论,使用布隆过滤器能够判断一定不存在,而不能用来判断一定存在。
布隆过滤器虽然不能完全避免误判,但是可以降低误判率,如何降低误判率呢?就是增加多个哈希算法,计算多个hash值,因为不同的值,经过多个哈希算法计算得到相同值的概率要低一些。

说明:可以看到,【hello】值经过3个哈希算法(实际不止3个)会计算出3个值,分别以这些值为坐标,标记数据为1,当判断值存在时,同样要经过这3个哈希算法计算3个哈希值,对应的都为1说明数据可能存在,如果其中有一个为0,就说明数据一定不存在。
在这里也能看出布隆过滤器的另外一个特性,哈希算法越多,误判率越低,但是所占用的空间越多,查询效率将越低。
总结下布隆过滤器的优缺点:
- 优点
- 存储的二进制数据,1或0,不存储真实数据,空间占用比较小且安全。
- 插入和查询速度非常快,因为是基于数组下标的,类似HashMap,其时间复杂度是O(K),其中k是指哈希算法个数。
- 缺点
- 存在误判,可以通过增加哈希算法个数降低误判率,不能完全避免误判。
- 删除困难,因为一个位置可能会代表多个值,不能做删除。
牢记结论:布隆过滤器能够判断一定不存在,而不能用来判断一定存在。
4、实现
关于布隆过滤器的使用,建议使用Google的Guava 或 Redission基于Redis实现,前者是在单体架构下比较适合,后者更适合在分布式场景下,便于多个服务节点之间共享。
Redission基于Redis,使用string类型数据,生成二进制数组进行存储,最大可用长度为:4294967294。
- 引入Redission依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
- Redission配置
@Configuration
public class RedissonConfiguration {
@Resource
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonSingle() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
if (null != (redisProperties.getTimeout())) {
serverConfig.setTimeout(1000 * Convert.toInt(redisProperties.getTimeout().getSeconds()));
}
if (StrUtil.isNotEmpty(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
- 自定义布隆过滤器配置:
package com.sl.transport.info.config;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* 布隆过滤器相关配置
*/
@Getter
@Configuration
public class BloomFilterConfig {
/**
* 名称,默认:sl-bloom-filter
*/
@Value("${bloom.name:sl-bloom-filter}")
private String name;
/**
* 布隆过滤器长度,最大支持Integer.MAX_VALUE*2,即:4294967294,默认:1千万
*/
@Value("${bloom.expectedInsertions:10000000}")
private long expectedInsertions;
/**
* 误判率,默认:0.05
*/
@Value("${bloom.falseProbability:0.05d}")
private double falseProbability;
}
- 定义BloomFilterService接口:
/**
* 布隆过滤器服务
*/
public interface BloomFilterService {
/**
* 初始化布隆过滤器
*/
void init();
/**
* 向布隆过滤器中添加数据
*
* @param obj 待添加的数据
* @return 是否成功
*/
boolean add(Object obj);
/**
* 判断数据是否存在
*
* @param obj 数据
* @return 是否存在
*/
boolean contains(Object obj);
}
- 编写实现类
@Service
public class BloomFilterServiceImpl implements BloomFilterService {
@Resource
private RedissonClient redissonClient;
@Resource
private BloomFilterConfig bloomFilterConfig;
private RBloomFilter<Object> getBloomFilter() {
return this.redissonClient.getBloomFilter(this.bloomFilterConfig.getName());
}
@Override
@PostConstruct // spring启动后进行初始化
public void init() {
RBloomFilter<Object> bloomFilter = this.getBloomFilter();
bloomFilter.tryInit(this.bloomFilterConfig.getExpectedInsertions(), this.bloomFilterConfig.getFalseProbability());
}
@Override
public boolean add(Object obj) {
return this.getBloomFilter().add(obj);
}
@Override
public boolean contains(Object obj) {
return this.getBloomFilter().contains(obj);
}
}
- 优化查询方法
改造TransportInfoController的查询逻辑,如果布隆过滤器中不存在直接返回即可,无需进行缓存命中。 - 注意,在新增数据时先要调用布隆过滤器新增方法,才能保证在查询时去布隆过滤器中先查找有效
@ApiImplicitParams({
@ApiImplicitParam(name = "transportOrderId", value = "运单id")
})
@GetMapping("{transportOrderId}")
public TransportInfoDTO queryByTransportOrderId(@PathVariable("transportOrderId") String transportOrderId) {
//如果布隆过滤器中不存在,无需缓存命中,直接返回即可
boolean contains = this.bloomFilterService.contains(transportOrderId);
if (!contains) {
throw new SLException(ExceptionEnum.NOT_FOUND);
}
TransportInfoDTO transportInfoDTO = transportInfoCache.get(transportOrderId, id -> {
//未命中,查询MongoDB
TransportInfoEntity transportInfoEntity = this.transportInfoService.queryByTransportOrderId(id);
//转化成DTO
return BeanUtil.toBean(transportInfoEntity, TransportInfoDTO.class);
});
if (ObjectUtil.isNotEmpty(transportInfoDTO)) {
return transportInfoDTO;
}
throw new SLException(ExceptionEnum.NOT_FOUND);
}