延迟任务,相信大家都不陌生,很多业务场景都会用到。
什么是延迟任务?
延迟任务实现
方案1、无限循环实现延迟任务
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 延迟任务执行方法汇总
*/
public class DelayTaskExample {
// 存放定时任务
private static Map<String, Long> _TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
// 添加定时任务
_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延迟 3s
// 调用无限循环实现延迟任务
loopTask();
}
/**
* 无限循环实现延迟任务
*/
public static void loopTask() {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// 有任务需要执行
if (Instant.now().toEpochMilli() >= itemLong) {
// 延迟任务,业务逻辑执行
System.out.println("执行任务:" + entry.getKey() +
" ,执行时间:" + LocalDateTime.now());
// 删除任务
_TaskMap.remove(entry.getKey());
}
}
}
}
}
程序启动时间:2020-04-12T18:51:28.188
执行任务:task-1 ,执行时间:2020-04-12T18:51:31.189
Java API 实现延迟任务
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
scheduledExecutorServiceTask();
}
/**
* ScheduledExecutorService 实现固定频率一直循环执行任务
*/
public static void scheduledExecutorServiceTask() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
// 执行任务的业务代码
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.now());
}
},
2, // 初次执行间隔
2, // 2s 执行一次
TimeUnit.SECONDS);
}
}
程序启动时间:2020-04-12T21:28:10.416
执行任务 ,执行时间:2020-04-12T21:28:12.421
执行任务 ,执行时间:2020-04-12T21:28:14.422
......
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// 添加延迟任务
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("开始时间:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 执行延迟任务
System.out.println(delayQueue.take());
}
System.out.println("结束时间:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// 延迟截止时间(单面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 获取剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 队列里元素的排序依据
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
}
开始时间:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
结束时间:2020-4-12 20:40:43
方案5、通过键空间通知
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延迟 30s 执行(30s 后的时间)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY, delayTime, "order_1");
// 继续添加测试数据
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// 开启延迟队列
doDelayQueue(jedis);
}
/**
* 延迟队列消费
* @param jedis Redis 客户端
*/
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 当前时间
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
long nowSecond = nowInstant.getEpochSecond();
// 查询当前时间的所有任务
Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
for (String item : data) {
// 消费任务
System.out.println("消费:" + item);
}
// 删除已经执行的任务
jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
Thread.sleep(1000); // 每秒轮询一次
}
}
}
方案6、Netty 实现延迟任务import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "[email protected]__:expired"; // 订阅频道名称
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 执行定时任务
doTask(jedis);
}
/**
* 订阅过期消息,执行定时任务
* @param jedis Redis 客户端
*/
public static void doTask(Jedis jedis) {
// 订阅过期消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收到消息,执行定时任务
System.out.println("收到消息:" + message);
}
}, _TOPIC);
}
}
<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.48.Final</version>
</dependency>
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.now());
NettyTask();
}
/**
* 基于 Netty 的延迟任务
*/
private static void NettyTask() {
// 创建延迟任务实例
HashedWheelTimer timer = new HashedWheelTimer(3, // 时间间隔
TimeUnit.SECONDS,
100); // 时间轮中的槽数
// 创建一个任务
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.now());
}
};
// 将任务添加到延迟队列中
timer.newTimeout(task, 0, TimeUnit.SECONDS);
}
}
程序启动时间:2020-04-13T10:16:23.033
执行任务 ,执行时间:2020-04-13T10:16:26.118
方案7、8 MQ 实现延迟任务
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
发送时间:2020-04-13 20:47:51
接收时间:2020-04-13 20:47:54
消息内容:Hi Admin.
方案9、使用 Spring 定时任务
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob() throws InterruptedException {
System.out.println("任务执行,时间:" + LocalDateTime.now());
}
}
任务执行,时间:2020-04-13T14:07:53.349
任务执行,时间:2020-04-13T14:07:55.350
任务执行,时间:2020-04-13T14:07:57.351
...
方案10、Quartz 实现延迟任务
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
public class SampleJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext)
throws JobExecutionException {
System.out.println("任务执行,时间:" + LocalDateTime.now());
}
}
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SampleScheduler {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
.storeDurably().build();
}
@Bean
public Trigger sampleJobTrigger() {
// 3s 后执行
SimpleScheduleBuilder scheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
.withSchedule(scheduleBuilder).build();
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/**
* SpringBoot 项目启动后执行
*/
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// 启动定时任务
schedulerFactoryBean.getScheduler().scheduleJob(
sampleScheduler.sampleJobTrigger());
}
}
2020-04-13 19:02:12.331 INFO 17768 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.815 seconds (JVM running for 3.088)
任务执行,时间:2020-04-13T19:02:15.019
从结果可以看出在项目启动 3s 之后执行了延迟任务。
- EOF -
1、延时消息常见实现方案
2、SpringBoot 定时任务踩坑记录
3、几种简单实用的分布式定时任务!
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️