//1 查询所有的进行中的任务列表 //间隔秒数:向后查询几秒的数据 List<UserPathTask> userPathTaskList = userPathTaskManager.list(Wrappers.<UserPathTask>lambdaQuery().eq(UserPathTask::getTaskStatus, UserPathTaskStatus.EXECUTING) .between(UserPathTask::getTaskStartTime, LocalDateTime.now(), LocalDateTime.now().plusSeconds(secondGap))); //2 判断redis中是否存在这些taskId作为key key-user:path:task:<taskId> value-任务ttl List<UserPathTask> sendTaskList = new ArrayList<>(); Set<String> keys = redisOps.keys(LayoutRedisKey.USER_PATH_TASK_INFO + "*");//user:path:task:* userPathTaskList.stream().forEach(pathTask -> { if (!keys.contains(LayoutRedisKey.USER_PATH_TASK_INFO + pathTask.getId())) {//2.2 redis中不存在则发送,并且存入redis sendTaskList.add(pathTask); Duration duration = Duration.between(LocalDateTime.now(), pathTask.getTaskEndTime());//每个任务ttl为结束时间减去当前时间 redisOps.setEx(LayoutRedisKey.USER_PATH_TASK_INFO + pathTask.getId(), JSONObject.toJSONString(pathTask), duration.toSeconds()); }//2.1 redis中存在则不发送消息 }); //3 查询redis是否存在固定key(代表需要检查当天) if (redisOps.exists(LayoutRedisKey.USER_PATH_TASK_RECHECK)) {//3.2 如果存在就 补漏 //3.2.1 查询00:00:00到now的 执行中的任务 List<UserPathTask> todayUserPathTaskList = userPathTaskManager.list(Wrappers.<UserPathTask>lambdaQuery().eq(UserPathTask::getTaskStatus, UserPathTaskStatus.EXECUTING) .between(UserPathTask::getTaskStartTime, DateUtil.getStartOfDay(LocalDateTime.now()), LocalDateTime.now())); //3.2.2 判断redis中是否存在 Set<String> newKeys = redisOps.keys(LayoutRedisKey.USER_PATH_TASK_INFO + "*");//user:path:task:* todayUserPathTaskList.stream().forEach(userPathTask -> { if (!newKeys.contains(LayoutRedisKey.USER_PATH_TASK_INFO + userPathTask.getId())) {// redis中不存在则发送,并且存入redis sendTaskList.add(userPathTask); Duration duration = Duration.between(LocalDateTime.now(), userPathTask.getTaskEndTime());//每个任务ttl为结束时间减去当前时间 redisOps.setEx(LayoutRedisKey.USER_PATH_TASK_INFO + userPathTask.getId(), JSONObject.toJSONString(userPathTask), duration.toSeconds()); }// redis中存在则不发送消息 }); } else {//3.1 如果不存在就添加 redisOps.setEx(LayoutRedisKey.USER_PATH_TASK_RECHECK, null, ttlSeconds); } sendTaskList.stream().forEach(userPathTask -> { kafkaUtil.sendKafkaMessage(userPathTask.getTaskCode(), TaskEvent.ON_TASK_START, userPathTask.getUserPathPackageId(), userPathTask.getUserPathId(), userPathTask.getUserModuleId(), userPathTask.getId(), null, null); });