Junjie
2026-04-27 8e8069362418d8db5179f7716d71d9fb3e4a0034
fix: make auto tune apply writes auditable atomically
3个文件已修改
277 ■■■■■ 已修改文件
src/main/java/com/zy/ai/service/impl/AutoTuneApplyServiceImpl.java 201 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/enums/RedisKeyType.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/ai/service/AutoTuneApplyServiceImplTest.java 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/ai/service/impl/AutoTuneApplyServiceImpl.java
@@ -22,6 +22,8 @@
import com.zy.asrs.service.BasDualCrnpService;
import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.StationFlowCapacityService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,12 +37,15 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@Service("autoTuneApplyService")
public class AutoTuneApplyServiceImpl implements AutoTuneApplyService {
    private static final String PROMPT_SCENE_CODE = "auto_tune_apply";
    private static final String DIRECTION_OUT = "OUT";
    private static final long APPLY_LOCK_SECONDS = 120L;
    private static final String APPLY_LOCK_BUSY_REASON = "AI自动调参正在执行,请稍后重试";
    @Autowired
    private AiAutoTuneJobService aiAutoTuneJobService;
@@ -58,6 +63,8 @@
    private StationFlowCapacityService stationFlowCapacityService;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private RedisUtil redisUtil;
    @Override
    public AutoTuneApplyResult apply(AutoTuneApplyRequest request) {
@@ -67,26 +74,114 @@
        AiAutoTuneJob job = createJob(safeRequest, dryRun, now);
        aiAutoTuneJobService.save(job);
        List<ValidatedChange> validatedChanges = validateChanges(safeRequest, dryRun, now);
        boolean hasRejectedChange = hasRejectedChange(validatedChanges);
        if (!dryRun && hasRejectedChange) {
            markAcceptedChangesAsBatchRejected(validatedChanges);
        if (dryRun) {
            return applyDryRun(safeRequest, job, now);
        }
        if (!dryRun && !hasRejectedChange) {
            try {
                applyValidatedChangesInTransaction(validatedChanges);
            } catch (RuntimeException exception) {
                markWriteFailure(validatedChanges, exception);
            }
        return applyRealWithLock(safeRequest, job, now);
    }
    private AutoTuneApplyResult applyDryRun(AutoTuneApplyRequest request, AiAutoTuneJob job, Date now) {
        List<ValidatedChange> validatedChanges = validateChanges(request, true, now);
        ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                job,
                request,
                validatedChanges,
                true,
                now,
                false
        );
        return buildResult(job, persistenceResult.getAuditChanges(), true);
    }
    private AutoTuneApplyResult applyRealWithLock(AutoTuneApplyRequest request, AiAutoTuneJob job, Date now) {
        if (request.getChanges() == null || request.getChanges().isEmpty()) {
            ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                    job,
                    request,
                    new ArrayList<>(),
                    false,
                    now,
                    false
            );
            return buildResult(job, persistenceResult.getAuditChanges(), false);
        }
        List<AiAutoTuneChange> auditChanges = buildAuditChanges(job.getId(), validatedChanges, now);
        if (!auditChanges.isEmpty()) {
            aiAutoTuneChangeService.saveBatch(auditChanges);
        String lockKey = RedisKeyType.AI_AUTO_TUNE_APPLY_LOCK.key;
        String lockToken = UUID.randomUUID().toString();
        if (!redisUtil.trySetStringIfAbsent(lockKey, lockToken, APPLY_LOCK_SECONDS)) {
            return rejectRealApplyForBusyLock(request, job, now);
        }
        finishJob(job, safeRequest, auditChanges, dryRun, now);
        aiAutoTuneJobService.updateById(job);
        return buildResult(job, auditChanges, dryRun);
        try {
            List<ValidatedChange> validatedChanges = validateChanges(request, false, now);
            boolean hasRejectedChange = hasRejectedChange(validatedChanges);
            if (hasRejectedChange) {
                markAcceptedChangesAsBatchRejected(validatedChanges);
            }
            if (hasRejectedChange) {
                ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                        job,
                        request,
                        validatedChanges,
                        false,
                        now,
                        false
                );
                return buildResult(job, persistenceResult.getAuditChanges(), false);
            }
            try {
                ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                        job,
                        request,
                        validatedChanges,
                        false,
                        now,
                        true
                );
                refreshSystemConfigCacheIfNeeded(persistenceResult);
                return buildResult(job, persistenceResult.getAuditChanges(), false);
            } catch (RuntimeException exception) {
                markWriteFailure(validatedChanges, exception);
                ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                        job,
                        request,
                        validatedChanges,
                        false,
                        now,
                        false
                );
                return buildResult(job, persistenceResult.getAuditChanges(), false);
            }
        } finally {
            redisUtil.compareAndDelete(lockKey, lockToken);
        }
    }
    private AutoTuneApplyResult rejectRealApplyForBusyLock(AutoTuneApplyRequest request, AiAutoTuneJob job, Date now) {
        List<ValidatedChange> validatedChanges = buildLockBusyChanges(request);
        ApplyPersistenceResult persistenceResult = persistApplyResultInTransaction(
                job,
                request,
                validatedChanges,
                false,
                now,
                false
        );
        return buildResult(job, persistenceResult.getAuditChanges(), false);
    }
    private List<ValidatedChange> buildLockBusyChanges(AutoTuneApplyRequest request) {
        List<ValidatedChange> validatedChanges = new ArrayList<>();
        if (request.getChanges() == null || request.getChanges().isEmpty()) {
            return validatedChanges;
        }
        for (AutoTuneChangeCommand command : request.getChanges()) {
            ValidatedChange validatedChange = new ValidatedChange(command);
            validatedChange.fail(APPLY_LOCK_BUSY_REASON);
            validatedChanges.add(validatedChange);
        }
        return validatedChanges;
    }
    @Override
@@ -279,12 +374,41 @@
        return cooldownExpireTime;
    }
    private void applyValidatedChangesInTransaction(List<ValidatedChange> validatedChanges) {
    private ApplyPersistenceResult persistApplyResultInTransaction(AiAutoTuneJob job,
                                                                   AutoTuneApplyRequest request,
                                                                   List<ValidatedChange> validatedChanges,
                                                                   boolean dryRun,
                                                                   Date now,
                                                                   boolean writeTargets) {
        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.executeWithoutResult(status -> applyValidatedChanges(validatedChanges));
        return transactionTemplate.execute(status -> persistApplyResult(
                job,
                request,
                validatedChanges,
                dryRun,
                now,
                writeTargets
        ));
    }
    private void applyValidatedChanges(List<ValidatedChange> validatedChanges) {
    private ApplyPersistenceResult persistApplyResult(AiAutoTuneJob job,
                                                     AutoTuneApplyRequest request,
                                                     List<ValidatedChange> validatedChanges,
                                                     boolean dryRun,
                                                     Date now,
                                                     boolean writeTargets) {
        boolean refreshConfigCache = false;
        if (writeTargets) {
            refreshConfigCache = applyValidatedChanges(validatedChanges);
        }
        List<AiAutoTuneChange> auditChanges = buildAuditChanges(job.getId(), validatedChanges, now);
        saveAuditChanges(auditChanges);
        finishJob(job, request, auditChanges, dryRun, now);
        updateJob(job);
        return new ApplyPersistenceResult(auditChanges, refreshConfigCache);
    }
    private boolean applyValidatedChanges(List<ValidatedChange> validatedChanges) {
        boolean refreshConfigCache = false;
        for (ValidatedChange validatedChange : validatedChanges) {
            if (ChangeStatus.NO_CHANGE.equals(validatedChange.getStatus())) {
@@ -302,7 +426,26 @@
                refreshConfigCache = true;
            }
        }
        if (refreshConfigCache) {
        return refreshConfigCache;
    }
    private void saveAuditChanges(List<AiAutoTuneChange> auditChanges) {
        if (auditChanges.isEmpty()) {
            return;
        }
        if (!aiAutoTuneChangeService.saveBatch(auditChanges)) {
            throw new IllegalStateException("保存调参审计失败");
        }
    }
    private void updateJob(AiAutoTuneJob job) {
        if (!aiAutoTuneJobService.updateById(job)) {
            throw new IllegalStateException("更新调参任务状态失败");
        }
    }
    private void refreshSystemConfigCacheIfNeeded(ApplyPersistenceResult persistenceResult) {
        if (persistenceResult != null && persistenceResult.isRefreshConfigCache()) {
            configService.refreshSystemConfigCache();
        }
    }
@@ -759,6 +902,24 @@
        return null;
    }
    private static class ApplyPersistenceResult {
        private final List<AiAutoTuneChange> auditChanges;
        private final boolean refreshConfigCache;
        private ApplyPersistenceResult(List<AiAutoTuneChange> auditChanges, boolean refreshConfigCache) {
            this.auditChanges = auditChanges == null ? new ArrayList<>() : auditChanges;
            this.refreshConfigCache = refreshConfigCache;
        }
        public List<AiAutoTuneChange> getAuditChanges() {
            return auditChanges;
        }
        public boolean isRefreshConfigCache() {
            return refreshConfigCache;
        }
    }
    private static class ValidatedChange {
        private final AutoTuneChangeCommand command;
        private final String targetType;
src/main/java/com/zy/core/enums/RedisKeyType.java
@@ -79,6 +79,7 @@
    CURRENT_CIRCLE_TASK_CRN_NO("current_circle_task_crn_no_"),
    MAIN_PROCESS_PSEUDOCODE("main_process_pseudocode"),
    AI_AUTO_TUNE_RUNNING_LOCK("ai_auto_tune_running_lock"),
    AI_AUTO_TUNE_APPLY_LOCK("ai_auto_tune_apply_lock"),
    AI_AUTO_TUNE_LAST_TRIGGER_GUARD("ai_auto_tune_last_trigger_guard"),
    PLANNER_SCHEDULE("planner_schedule_"),
    HIGH_PRIVILEGE_GRANT("high_privilege_grant_"),
src/test/java/com/zy/ai/service/AutoTuneApplyServiceImplTest.java
@@ -15,6 +15,8 @@
import com.zy.asrs.service.BasDualCrnpService;
import com.zy.asrs.service.BasStationService;
import com.zy.asrs.service.StationFlowCapacityService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
import org.junit.jupiter.api.BeforeEach;
@@ -40,9 +42,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -67,6 +73,8 @@
    private BasDualCrnpService basDualCrnpService;
    @Mock
    private StationFlowCapacityService stationFlowCapacityService;
    @Mock
    private RedisUtil redisUtil;
    private RecordingTransactionManager transactionManager;
    @BeforeEach
@@ -81,6 +89,7 @@
        ReflectionTestUtils.setField(service, "basDualCrnpService", basDualCrnpService);
        ReflectionTestUtils.setField(service, "stationFlowCapacityService", stationFlowCapacityService);
        ReflectionTestUtils.setField(service, "transactionManager", transactionManager);
        ReflectionTestUtils.setField(service, "redisUtil", redisUtil);
        AtomicLong jobId = new AtomicLong(100);
        when(aiAutoTuneJobService.save(any(AiAutoTuneJob.class))).thenAnswer(invocation -> {
@@ -96,6 +105,7 @@
        when(basStationService.update(any(Wrapper.class))).thenReturn(true);
        when(basCrnpService.update(any(Wrapper.class))).thenReturn(true);
        when(basDualCrnpService.update(any(Wrapper.class))).thenReturn(true);
        when(redisUtil.trySetStringIfAbsent(anyString(), anyString(), anyLong())).thenReturn(true);
    }
    @Test
@@ -311,6 +321,61 @@
        assertEquals("failed", changes.get(0).getResultStatus());
        assertTrue(changes.get(0).getRejectReason().contains("db write failed"));
        assertEquals(1, transactionManager.getRollbackCount());
        verify(redisUtil).compareAndDelete(eq(RedisKeyType.AI_AUTO_TUNE_APPLY_LOCK.key), anyString());
    }
    @Test
    void auditSaveBatchFailureRollsBackTargetWriteAndReturnsFailedAudit() {
        when(configService.getOne(any(Wrapper.class))).thenReturn(config("conveyorStationTaskLimit", "10"));
        when(aiAutoTuneChangeService.saveBatch(any(Collection.class)))
                .thenThrow(new IllegalStateException("audit failed"))
                .thenReturn(true);
        AutoTuneApplyResult result = service.apply(request(false, command("sys_config", null, "conveyorStationTaskLimit", "15")));
        List<AiAutoTuneChange> changes = savedChanges();
        AiAutoTuneJob updatedJob = updatedJob();
        assertFalse(result.getSuccess());
        assertEquals("failed", updatedJob.getStatus());
        assertEquals("failed", changes.get(0).getResultStatus());
        assertTrue(changes.get(0).getRejectReason().contains("audit failed"));
        assertEquals(1, transactionManager.getRollbackCount());
        verify(configService).saveConfigValue("conveyorStationTaskLimit", "15");
        verify(configService, never()).refreshSystemConfigCache();
        verify(redisUtil).compareAndDelete(eq(RedisKeyType.AI_AUTO_TUNE_APPLY_LOCK.key), anyString());
    }
    @Test
    void jobUpdateFailureRollsBackTargetWriteTransaction() {
        when(configService.getOne(any(Wrapper.class))).thenReturn(config("conveyorStationTaskLimit", "10"));
        when(aiAutoTuneJobService.updateById(any(AiAutoTuneJob.class))).thenReturn(false);
        IllegalStateException exception = assertThrows(IllegalStateException.class,
                () -> service.apply(request(false, command("sys_config", null, "conveyorStationTaskLimit", "15"))));
        assertTrue(exception.getMessage().contains("更新调参任务状态失败"));
        assertEquals(2, transactionManager.getRollbackCount());
        assertEquals(0, transactionManager.getCommitCount());
        verify(configService).saveConfigValue("conveyorStationTaskLimit", "15");
        verify(configService, never()).refreshSystemConfigCache();
        verify(redisUtil).compareAndDelete(eq(RedisKeyType.AI_AUTO_TUNE_APPLY_LOCK.key), anyString());
    }
    @Test
    void realApplyLockNotAcquiredRejectsWithoutTargetWrite() {
        when(redisUtil.trySetStringIfAbsent(anyString(), anyString(), anyLong())).thenReturn(false);
        AutoTuneApplyResult result = service.apply(request(false, command("sys_config", null, "conveyorStationTaskLimit", "15")));
        List<AiAutoTuneChange> changes = savedChanges();
        AiAutoTuneJob updatedJob = updatedJob();
        assertFalse(result.getSuccess());
        assertEquals("failed", updatedJob.getStatus());
        assertEquals("failed", changes.get(0).getResultStatus());
        assertTrue(changes.get(0).getRejectReason().contains("正在执行"));
        verify(configService, never()).getOne(any(Wrapper.class));
        verify(configService, never()).saveConfigValue(any(), any());
        verify(redisUtil, never()).compareAndDelete(anyString(), anyString());
    }
    @Test
@@ -445,14 +510,16 @@
    private List<AiAutoTuneChange> savedChanges() {
        ArgumentCaptor<Collection<AiAutoTuneChange>> captor = ArgumentCaptor.forClass(Collection.class);
        verify(aiAutoTuneChangeService).saveBatch(captor.capture());
        return new ArrayList<>(captor.getValue());
        verify(aiAutoTuneChangeService, atLeastOnce()).saveBatch(captor.capture());
        List<Collection<AiAutoTuneChange>> allValues = captor.getAllValues();
        return new ArrayList<>(allValues.get(allValues.size() - 1));
    }
    private AiAutoTuneJob updatedJob() {
        ArgumentCaptor<AiAutoTuneJob> captor = ArgumentCaptor.forClass(AiAutoTuneJob.class);
        verify(aiAutoTuneJobService).updateById(captor.capture());
        return captor.getValue();
        verify(aiAutoTuneJobService, atLeastOnce()).updateById(captor.capture());
        List<AiAutoTuneJob> allValues = captor.getAllValues();
        return allValues.get(allValues.size() - 1);
    }
    private static class RecordingTransactionManager implements PlatformTransactionManager {