package com.zy.integration.iot.task;
|
|
import com.zy.integration.iot.publish.IotPublishService;
|
import com.zy.iot.config.IotProperties;
|
import com.zy.iot.entity.IotPublishRecord;
|
import com.zy.iot.service.IotPublishRecordService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.util.List;
|
|
/**
|
* 待发布消息补偿任务。
|
* 负责扫描 PENDING/FAILURE 的记录继续推送,不对业务 ACK_FAILURE 做静默重发。
|
*/
|
@Slf4j
|
@Component
|
public class IotPendingPublishScheduler {
|
|
@Autowired
|
private IotProperties iotProperties;
|
@Autowired
|
private IotPublishRecordService iotPublishRecordService;
|
@Autowired
|
private IotPublishService iotPublishService;
|
|
/**
|
* 小批量补发,避免单条异常消息拖住整批待发数据。
|
*/
|
@Scheduled(cron = "0/5 * * * * ? ")
|
private void execute() {
|
if (!iotProperties.isEnabled()) {
|
return;
|
}
|
List<IotPublishRecord> records = iotPublishRecordService.selectPendingPublishes(50);
|
if (records == null || records.isEmpty()) {
|
return;
|
}
|
for (IotPublishRecord record : records) {
|
try {
|
iotPublishService.publishRecordNow(record.getId());
|
} catch (Exception e) {
|
log.error("publish pending iot record failed, id={}", record.getId(), e);
|
}
|
}
|
}
|
}
|