一、前期准备

1、悲观锁和乐观锁

  • 悲观锁(Pessimistic Lock)

    每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁

  • 乐观锁(Optimistic Lock)

    每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制

2、数据库准备

  • taskinfo 任务表

1-延迟任务实战.png

  • 实体类
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;


}
  • taskinfo_logs 任务日志表

2-延迟任务实战.png

  • 实体类
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;

    /**
     * 版本号,用乐观锁,每次更新完数据后,MybatisPlus会对该字段自增,实现乐观锁效果
     *
     * update taskinfo_logs set status=?,version=version+1 where task_id=? and version=?;
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;


}

3、在启动类中添加mybatis-plus开启乐观锁支持:

/**
     * mybatis-plus乐观锁支持
     * @return
     */
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
    return interceptor;
}

3、添加任务

  • 定义ScheduleConstants常量类
/**
 * 任务状态常量类
 */
public class ScheduleConstants {

    //初始化状态
    public static final int SCHEDULED=0;   
    
	//已执行状态
    public static final int EXECUTED=1;   
    
	//已取消状态
    public static final int CANCELLED=2;   
    
	//未来数据key前缀
    public static String FUTURE="future:";   
    
	//当前数据key前缀
    public static String TOPIC="topic:";     
}
  • 定义TaskTypeEnum枚举类
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    WM_NEWS(1001, 1,"自媒体文章"),
    FETCH_NEWS(1002, 2,"爬虫文章");
    //任务类型
    private final int taskType; 
    //任务优先级
    private final int priority; 
    //任务说明
    private final String desc;
}

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  • **Protostuff:**google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

Protostuff需要引导依赖:

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>
  • 定时任务业务层接口
/**
 * 定时任务业务接口
 */
public interface TaskService {

    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    long addTask(Task task) ;
}
  • 业务层实现类
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    private TaskInfoMapper taskInfoMapper;

    @Autowired
    private TaskInfoLogsMapper taskInfoLogsMapper;

    @Autowired
    private CacheService cacheService;

    /**
     * redisKeu设计规范:
     *  1. 一定是唯一的
     *  2. 使用 : 拼接
     *  3. redisKey要包含业务数据
     *  例如:学生的数据列表 students:list
     *       指定id的学生数据 student:001
     *
     * 新增任务
     *
     * @param task   任务对象
     * @return
     */
    @Override
    public long addTask(Task task) {
        // 1. 把任务信息添加到数据库中
        saveTaskToDB(task);
        // 3. 把任务保存到Redis中
        saveTaskToCache(task);

        return task.getTaskId();
    }

    /**
     * 把当前任务添加到 当前任务队列缓存
     * 把未来任务添加到 未来任务队列缓存
     *
     * @param task 任务对象
     */
    private void saveTaskToCache(Task task) {
        // 3.1 获取任务的执行时间
        long executeTime = task.getExecuteTime();
        // 3.2 获取系统当前时间
        long currentTime = DateTime.now().getMillis();

        // 任务执行时间小于等于当前系统时间(任务执行时间已到,添加任务到当前任务队列)
        if (executeTime <= currentTime){
            //当前队列redisKey的设计格式:当前队列标识:任务类型:任务优先级 topic:1001:1
            String redisKey=ScheduleConstants.TOPIC+ task.getTaskType()+":"+ task.getPriority();
            // 把任务添加到当前任务队列
            cacheService.lLeftPush(redisKey, JSON.toJSONString(task) );
            log.info("添加任务到当前队列成功 ===> {}",task.getTaskId());
        }else {
            // 任务时间大于当前系统时间(任务时间未到,添加任务到未来任务队列)
            // 未来队列redisKey的设计格式:当前队列标识:任务类型:任务优先级 future:1001:1
            String redisKey=ScheduleConstants.FUTURE+ task.getTaskType()+":"+ task.getPriority();
            // 把未来任务添加到未来队列中,并设置任务的执行时间
            cacheService.zAdd(redisKey,JSON.toJSONString(task), executeTime);
            log.info("添加任务到未来队列成功 ===> {}",task.getTaskId());
        }
    }

    /**
     * 保存任务和任务日志到数据库中
     * @param task 任务对象
     * @return 任务对象
     */
    private void saveTaskToDB(Task task) {

        // 1.1 创建taskinfo对象
        Taskinfo taskinfo = new Taskinfo();
        // 1.2 把task对象复制给taskinfo
        BeanUtils.copyProperties(task,taskinfo);
        // 1.3 转换执行时间
        taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
        // 1.4 添加任务到数据库
        taskInfoMapper.insert(taskinfo);

        // 2.把任务日志添加到数据库中
        // 2.1 创建TaskinfoLogs对象
        TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
        // 2.2 把task对象复制给taskinfoLogs
        BeanUtils.copyProperties(taskinfo,taskinfoLogs);
        // 2.3 初始化乐观锁版本号
        taskinfoLogs.setVersion(1);
        // 2.4 初始化任务状态,待执行
        taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
        // 2.5 设置任务id
        task.setTaskId(taskinfo.getTaskId());
        // 2.6 把任务日志存储到数据库中
        taskInfoLogsMapper.insert(taskinfoLogs);
        log.info("添加任务到数据库中成功 ===> {}",task.getTaskId());
    }
}

二、未来任务定时刷新

1、定时支持

在线生成corn: https://cron.qqe2.com/

 @Scheduled(cron = "0 */1 * * * ?") 每一分钟执行一次

注意在启动类上添加注解

Scheduled  需要开启 @EnableScheduling  //开启任务调度

2、reids key值匹配

  • 方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

3-kys匹配.png

  • 方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

4-scan.png

3、未来任务定时刷新

  • 在TaskServiceImpl中添加方法
    /**
     * 定时任务:定时刷新未来任务队列缓存里的任务到当前任务队列缓存
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void refreshTask() {
        // 1. 获取未来队列中的任务
        Set<String> zSetKeySet = cacheService.scan(ScheduleConstants.FUTURE + "*");
        // 2. 遍历未来队列的KEY
        if (!zSetKeySet.isEmpty()) {
            for (String zSetKey : zSetKeySet) {
                // 3. 根据每个KEY从未来队列中取出执行时间已到的任务
                Set<String> taskSet = cacheService.zRangeByScore(zSetKey, 0, DateTime.now().getMillis());
                if (!taskSet.isEmpty()) {
                    // 4. 将取出的任务从未来队列中删除
                    cacheService.zRemove(zSetKey, taskSet);
                    // 切割未来队列的key,拼接出当前队列key
                    String[] split = zSetKey.split(":");
                    // 拼接当前队列的redisKey
                    String listKey = ScheduleConstants.TOPIC + split[1] + ":" + split[2];
                    // 5. 把取出的任务添加到当前队列
                    cacheService.lLeftPushAll(listKey, taskSet);
                }
            }
        }
    }

4、Redis管道优化

  • 普通redis客户端和服务器交互模式

5-管道优化.png

  • Pipeline请求模型

6-管道优化.png

  • 官方测试结果数据对比

7-管道优化.png

-** 优化未来任务定时刷新**

    /**
     * 定时任务:定时刷新未来任务队列缓存里的任务到当前任务队列缓存
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void refreshTask() {
        // 1. 获取未来队列中的任务
        Set<String> zSetKeySet = cacheService.scan(ScheduleConstants.FUTURE + "*");
        // 2. 遍历未来队列的KEY
        if (!zSetKeySet.isEmpty()) {
            for (String zSetKey : zSetKeySet) {
                // 3. 根据每个KEY从未来队列中取出执行时间已到的任务
                Set<String> taskSet = cacheService.zRangeByScore(zSetKey, 0, DateTime.now().getMillis());
                // 使用Redis管道技术
                cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
                    @Override
                    public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                        if (!taskSet.isEmpty()) {
                            // 4. 将取出的任务从未来队列中删除
                            cacheService.zRemove(zSetKey, taskSet);
                            // 切割未来队列的key,拼接出当前队列key
                            String[] split = zSetKey.split(":");
                            // 拼接当前队列的redisKey
                            String listKey = ScheduleConstants.TOPIC + split[1] + ":" + split[2];
                            // 5. 把取出的任务添加到当前队列
                            cacheService.lLeftPushAll(listKey, taskSet);
                        }
                        return null;
                    }
                });
            }
        }
    }

5、线程池优化

  • 参考wemedia导入 ThreadPoolConfig类到config包下

  • 定时刷新任务TaskRefreshService业务层接口

/**
 * @Author :leaflei
 * @Version: 1.0
 * @Description :定时刷新任务
 */
public interface TaskRefreshService {
    /**
     * 异步刷新任务
     * @param zSetKeySet 未来队列的key集合
     */
    void refreshTask(Set<String> zSetKeySet);
}
  • 定时刷新任务TaskRefreshServiceImpl业务层实现类
/**
 * @Author :leaflei
 * @Version: 1.0
 * @Description :定时刷新任务
 */
@Service
public class TaskRefreshServiceImpl implements TaskRefreshService {

    @Autowired
    private CacheService cacheService;

    /**
     * 异步任务,定时刷新
     *
     * @param zSetKeySet 未来队列的key集合
     */
    @Async("taskExecutor")
    @Override
    public void refreshTask(Set<String> zSetKeySet) {
        for (String zSetKey : zSetKeySet) {
            // 3. 根据每个KEY从未来队列中取出执行时间已到的任务
            Set<String> taskSet = cacheService.zRangeByScore(zSetKey, 0, DateTime.now().getMillis());
            // 使用Redis管道技术
            cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                    if (!taskSet.isEmpty()) {
                        // 4. 将取出的任务从未来队列中删除
                        cacheService.zRemove(zSetKey, taskSet);
                        // 切割未来队列的key,拼接出当前队列key
                        String[] split = zSetKey.split(":");
                        // 拼接当前队列的redisKey
                        String listKey = ScheduleConstants.TOPIC + split[1] + ":" + split[2];
                        // 5. 把取出的任务添加到当前队列
                        cacheService.lLeftPushAll(listKey, taskSet);
                    }
                    return null;
                }
            });
        }
    }
}
  • 在TaskServiceImpl的refreshCache方法中提交异步任务到线程池
    @Autowired
    private TaskRefreshService taskRefreshService.;

    /**
     * 定时任务:定时刷新未来任务队列缓存里的任务到当前任务队列缓存
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void refreshTask() {
        // 1. 获取未来队列中的任务
        Set<String> zSetKeySet = cacheService.scan(ScheduleConstants.FUTURE + "*");
        // 2. 遍历未来队列的KEY
        if (!zSetKeySet.isEmpty()) {
            // 使用线程池,异步刷新任务,
            taskRefreshService.refreshTask(zSetKeySet);
        }
    }