package com.zy.integration.iot.task; import com.zy.integration.iot.publish.IotPublishService; import com.zy.iot.config.IotProperties; import com.zy.iot.entity.IotPublishRecord; import com.zy.iot.service.IotPublishRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** * 待发布消息补偿任务。 * 负责扫描 PENDING/FAILURE 的记录继续推送,不对业务 ACK_FAILURE 做静默重发。 */ @Slf4j @Component public class IotPendingPublishScheduler { @Autowired private IotProperties iotProperties; @Autowired private IotPublishRecordService iotPublishRecordService; @Autowired private IotPublishService iotPublishService; /** * 小批量补发,避免单条异常消息拖住整批待发数据。 */ @Scheduled(cron = "0/5 * * * * ? ") private void execute() { if (!iotProperties.isEnabled()) { return; } List records = iotPublishRecordService.selectPendingPublishes(50); if (records == null || records.isEmpty()) { return; } for (IotPublishRecord record : records) { try { iotPublishService.publishRecordNow(record.getId()); } catch (Exception e) { log.error("publish pending iot record failed, id={}", record.getId(), e); } } } }