package com.zy.acs.manager.core.scheduler;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.zy.acs.manager.core.service.GuaranteeRuntimeService;
|
import com.zy.acs.manager.manager.entity.Guarantee;
|
import com.zy.acs.manager.manager.enums.StatusType;
|
import com.zy.acs.manager.manager.service.GuaranteeService;
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.support.CronExpression;
|
import org.springframework.stereotype.Component;
|
|
import java.time.LocalDateTime;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.stream.Collectors;
|
|
@Slf4j
|
@Component
|
public class GuaranteeScheduler {
|
|
private final GuaranteeService guaranteeService;
|
private final GuaranteeRuntimeService guaranteeRuntimeService;
|
|
@Value("${guarantee.scheduler.enabled:true}")
|
private boolean enabled;
|
@Value("${guarantee.scheduler.default-lead-minutes:60}")
|
private int defaultLeadMinutes;
|
@Value("${guarantee.scheduler.lock-duration-minutes:5}")
|
private int lockDurationMinutes;
|
@Value("${guarantee.scheduler.window-duration-minutes:10}")
|
private int windowDurationMinutes;
|
|
private final Map<Long, PlanRuntimeState> runtimeStates = new ConcurrentHashMap<>();
|
|
public GuaranteeScheduler(GuaranteeService guaranteeService,
|
GuaranteeRuntimeService guaranteeRuntimeService) {
|
this.guaranteeService = guaranteeService;
|
this.guaranteeRuntimeService = guaranteeRuntimeService;
|
}
|
|
@Scheduled(cron = "0/15 * * * * ?")
|
public void drive() {
|
if (!enabled) {
|
return;
|
}
|
List<Guarantee> plans = guaranteeService.list(new LambdaQueryWrapper<Guarantee>()
|
.eq(Guarantee::getStatus, StatusType.ENABLE.val));
|
syncCache(plans);
|
LocalDateTime now = LocalDateTime.now();
|
for (Guarantee plan : plans) {
|
try {
|
processPlan(plan, now);
|
} catch (Exception ex) {
|
log.error("GuaranteeScheduler failed for plan {}", plan.getName(), ex);
|
}
|
}
|
}
|
|
private void processPlan(Guarantee plan, LocalDateTime now) {
|
PlanRuntimeState state = runtimeStates.computeIfAbsent(plan.getId(), id -> new PlanRuntimeState());
|
if (state.getTargetTime() == null || now.isAfter(state.getWindowEnd())) {
|
LocalDateTime next = computeNext(plan, now);
|
if (next == null) {
|
runtimeStates.remove(plan.getId());
|
return;
|
}
|
state.init(plan, next, resolveLeadMinutes(plan));
|
}
|
if (now.isBefore(state.getPrepareStart())) {
|
state.setPhase(Phase.IDLE);
|
return;
|
}
|
if (now.isBefore(state.getLockStart())) {
|
state.transition(Phase.PREPARING, () -> guaranteeRuntimeService.prepare(plan, state.getTargetTime()));
|
return;
|
}
|
if (now.isBefore(state.getTargetTime())) {
|
state.transition(Phase.LOCKING, () -> guaranteeRuntimeService.lock(plan, state.getTargetTime()));
|
return;
|
}
|
if (now.isBefore(state.getWindowEnd())) {
|
state.transition(Phase.WINDOW, () -> guaranteeRuntimeService.checkWindow(plan, state.getTargetTime()));
|
guaranteeRuntimeService.checkWindow(plan, state.getTargetTime());
|
return;
|
}
|
if (state.getPhase() != Phase.FINISHED) {
|
state.transition(Phase.FINISHED, () -> guaranteeRuntimeService.finish(plan, state.getTargetTime()));
|
}
|
LocalDateTime next = computeNext(plan, state.getWindowEnd().plusSeconds(1));
|
if (next == null) {
|
runtimeStates.remove(plan.getId());
|
return;
|
}
|
state.init(plan, next, resolveLeadMinutes(plan));
|
}
|
|
private void syncCache(List<Guarantee> plans) {
|
Set<Long> activeIds = plans.stream().map(Guarantee::getId).collect(Collectors.toSet());
|
runtimeStates.keySet().removeIf(id -> !activeIds.contains(id));
|
}
|
|
private int resolveLeadMinutes(Guarantee plan) {
|
return Math.max(1, plan.getLeadTime() == null ? defaultLeadMinutes : plan.getLeadTime());
|
}
|
|
private LocalDateTime computeNext(Guarantee plan, LocalDateTime reference) {
|
String cronExpr = plan.getCronExpr();
|
if (cronExpr == null || cronExpr.trim().isEmpty()) {
|
return null;
|
}
|
try {
|
CronExpression cron = CronExpression.parse(cronExpr);
|
LocalDateTime base = reference == null ? LocalDateTime.now() : reference;
|
LocalDateTime next = cron.next(base);
|
if (next == null && !base.equals(LocalDateTime.now())) {
|
next = cron.next(LocalDateTime.now());
|
}
|
return next;
|
} catch (IllegalArgumentException ex) {
|
log.error("Guarantee[{}] invalid cron {} ", plan.getName(), cronExpr, ex);
|
return null;
|
}
|
}
|
|
private enum Phase {
|
IDLE,
|
PREPARING,
|
LOCKING,
|
WINDOW,
|
FINISHED
|
}
|
|
@Data
|
private class PlanRuntimeState {
|
private LocalDateTime targetTime;
|
private LocalDateTime prepareStart;
|
private LocalDateTime lockStart;
|
private LocalDateTime windowEnd;
|
private Phase phase = Phase.IDLE;
|
|
private void init(Guarantee plan, LocalDateTime target, int leadMinutes) {
|
this.targetTime = target;
|
this.prepareStart = target.minusMinutes(Math.max(1, leadMinutes));
|
int lockMinutes = Math.min(lockDurationMinutes, leadMinutes);
|
this.lockStart = target.minusMinutes(Math.max(1, lockMinutes));
|
this.windowEnd = target.plusMinutes(Math.max(1, windowDurationMinutes));
|
this.phase = Phase.IDLE;
|
log.info("Guarantee[{}] next target {} prepare@{} lock@{} windowEnd@{}",
|
plan.getName(), targetTime, prepareStart, lockStart, windowEnd);
|
}
|
|
private void transition(Phase targetPhase, Runnable callback) {
|
if (this.phase.ordinal() >= targetPhase.ordinal()) {
|
return;
|
}
|
callback.run();
|
this.phase = targetPhase;
|
}
|
}
|
}
|