#
vincentlu
昨天 befebaf1b91877843acccc31d04f77f2258a501d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.zy.acs.manager.core.scheduler;
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zy.acs.manager.core.service.GuaranteeRuntimeService;
import com.zy.acs.manager.manager.entity.Guarantee;
import com.zy.acs.manager.manager.enums.StatusType;
import com.zy.acs.manager.manager.service.GuaranteeService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronExpression;
import org.springframework.stereotype.Component;
 
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
 
@Slf4j
//@Component
public class GuaranteeScheduler {
 
    private final GuaranteeService guaranteeService;
    private final GuaranteeRuntimeService guaranteeRuntimeService;
 
    @Value("${guarantee.scheduler.enabled:true}")
    private boolean enabled;
    @Value("${guarantee.scheduler.default-lead-minutes:60}")
    private int defaultLeadMinutes;
    @Value("${guarantee.scheduler.lock-duration-minutes:5}")
    private int lockDurationMinutes;
    @Value("${guarantee.scheduler.window-duration-minutes:10}")
    private int windowDurationMinutes;
 
    private final Map<Long, PlanRuntimeState> runtimeStates = new ConcurrentHashMap<>();
 
    public GuaranteeScheduler(GuaranteeService guaranteeService,
                              GuaranteeRuntimeService guaranteeRuntimeService) {
        this.guaranteeService = guaranteeService;
        this.guaranteeRuntimeService = guaranteeRuntimeService;
    }
 
    @Scheduled(cron = "0/15 * * * * ?")
    public void drive() {
        if (!enabled) {
            return;
        }
        List<Guarantee> plans = guaranteeService.list(new LambdaQueryWrapper<Guarantee>()
                .eq(Guarantee::getStatus, StatusType.ENABLE.val));
        syncCache(plans);
        LocalDateTime now = LocalDateTime.now();
        for (Guarantee plan : plans) {
            try {
                processPlan(plan, now);
            } catch (Exception ex) {
                log.error("GuaranteeScheduler failed for plan {}", plan.getName(), ex);
            }
        }
    }
 
    private void processPlan(Guarantee plan, LocalDateTime now) {
        PlanRuntimeState state = runtimeStates.computeIfAbsent(plan.getId(), id -> new PlanRuntimeState());
        if (state.getTargetTime() == null || now.isAfter(state.getWindowEnd())) {
            LocalDateTime next = computeNext(plan, now);
            if (next == null) {
                runtimeStates.remove(plan.getId());
                return;
            }
            state.init(plan, next, resolveLeadMinutes(plan));
        }
        if (now.isBefore(state.getPrepareStart())) {
            state.setPhase(Phase.IDLE);
            return;
        }
        if (now.isBefore(state.getLockStart())) {
            state.transition(Phase.PREPARING, () -> guaranteeRuntimeService.prepare(plan, state.getTargetTime()));
            return;
        }
        if (now.isBefore(state.getTargetTime())) {
            state.transition(Phase.LOCKING, () -> guaranteeRuntimeService.lock(plan, state.getTargetTime()));
            return;
        }
        if (now.isBefore(state.getWindowEnd())) {
            state.transition(Phase.WINDOW, () -> guaranteeRuntimeService.checkWindow(plan, state.getTargetTime()));
            guaranteeRuntimeService.checkWindow(plan, state.getTargetTime());
            return;
        }
        if (state.getPhase() != Phase.FINISHED) {
            state.transition(Phase.FINISHED, () -> guaranteeRuntimeService.finish(plan, state.getTargetTime()));
        }
        LocalDateTime next = computeNext(plan, state.getWindowEnd().plusSeconds(1));
        if (next == null) {
            runtimeStates.remove(plan.getId());
            return;
        }
        state.init(plan, next, resolveLeadMinutes(plan));
    }
 
    private void syncCache(List<Guarantee> plans) {
        Set<Long> activeIds = plans.stream().map(Guarantee::getId).collect(Collectors.toSet());
        runtimeStates.keySet().removeIf(id -> !activeIds.contains(id));
    }
 
    private int resolveLeadMinutes(Guarantee plan) {
        return Math.max(1, plan.getLeadTime() == null ? defaultLeadMinutes : plan.getLeadTime());
    }
 
    private LocalDateTime computeNext(Guarantee plan, LocalDateTime reference) {
        String cronExpr = plan.getCronExpr();
        if (cronExpr == null || cronExpr.trim().isEmpty()) {
            return null;
        }
        try {
            CronExpression cron = CronExpression.parse(cronExpr);
            LocalDateTime base = reference == null ? LocalDateTime.now() : reference;
            LocalDateTime next = cron.next(base);
            if (next == null && !base.equals(LocalDateTime.now())) {
                next = cron.next(LocalDateTime.now());
            }
            return next;
        } catch (IllegalArgumentException ex) {
            log.error("Guarantee[{}] invalid cron {} ", plan.getName(), cronExpr, ex);
            return null;
        }
    }
 
    private enum Phase {
        IDLE,
        PREPARING,
        LOCKING,
        WINDOW,
        FINISHED
    }
 
    @Data
    private class PlanRuntimeState {
        private LocalDateTime targetTime;
        private LocalDateTime prepareStart;
        private LocalDateTime lockStart;
        private LocalDateTime windowEnd;
        private Phase phase = Phase.IDLE;
 
        private void init(Guarantee plan, LocalDateTime target, int leadMinutes) {
            this.targetTime = target;
            this.prepareStart = target.minusMinutes(Math.max(1, leadMinutes));
            int lockMinutes = Math.min(lockDurationMinutes, leadMinutes);
            this.lockStart = target.minusMinutes(Math.max(1, lockMinutes));
            this.windowEnd = target.plusMinutes(Math.max(1, windowDurationMinutes));
            this.phase = Phase.IDLE;
            log.info("Guarantee[{}] next target {} prepare@{} lock@{} windowEnd@{}",
                    plan.getName(), targetTime, prepareStart, lockStart, windowEnd);
        }
 
        private void transition(Phase targetPhase, Runnable callback) {
            if (this.phase.ordinal() >= targetPhase.ordinal()) {
                return;
            }
            callback.run();
            this.phase = targetPhase;
        }
    }
}