关于PostgreSQL数据库中的informat
558 2023-04-03 01:44:45
这里需要注意一下启动监听容器的方法,项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思,所以我们需要判断容器是否运行,如果运行则调用resume方法,否则调用start方法
@Component@EnableSchedulingpublic class TaskListener{ private static final Logger log= LoggerFactory.getLogger(TaskListener.class); @Autowired private KafkaListenerEndpointRegistry registry; @Autowired private ConsumerFactory consumerFactory; @Bean public ConcurrentKafkaListenerContainerFactory delayContainerFactory() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(consumerFactory); //禁止自动启动 container.setAutoStartup(false); return container; } @KafkaListener(id = "durable", topics = "topic.quick.durable",containerFactory = "delayContainerFactory") public void durableListener(String data) { //这里做数据持久化的操作 log.info("topic.quick.durable receive : " + data); } //定时器,每天凌晨0点开启监听 @Scheduled(cron = "0 0 0 * * ?") public void startListener() { log.info("开启监听"); //判断监听容器是否启动,未启动则将其启动 if (!registry.getListenerContainer("durable").isRunning()) { registry.getListenerContainer("durable").start(); } registry.getListenerContainer("durable").resume(); } //定时器,每天早上10点关闭监听 @Scheduled(cron = "0 0 10 * * ?") public void shutDownListener() { log.info("关闭监听"); registry.getListenerContainer("durable").pause(); }}
参考:
https://www.jianshu.com/p/2447592ca5a9