package com.zy.ai.service.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.zy.ai.entity.AiDataAnalysisReport;
|
import com.zy.ai.service.*;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.enums.RedisKeyType;
|
import com.zy.system.entity.OperateLog;
|
import com.zy.system.service.ConfigService;
|
import com.zy.system.service.OperateLogService;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Arrays;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.UUID;
|
|
@Slf4j
|
@Service("dataAnalysisCoordinatorService")
|
@RequiredArgsConstructor
|
public class DataAnalysisCoordinatorServiceImpl implements DataAnalysisCoordinatorService {
|
|
private static final String CONFIG_ENABLED = "aiDataAnalysisEnabled";
|
private static final String CONFIG_PERIODS = "aiDataAnalysisScheduledPeriods";
|
private static final int RUNNING_LOCK_SECONDS = 30 * 60;
|
private static final long SYSTEM_USER_ID = 9527L;
|
|
private final ConfigService configService;
|
private final DataAnalysisAgentService dataAnalysisAgentService;
|
private final AiDataAnalysisReportService aiDataAnalysisReportService;
|
private final DataAnalysisFileStorageService dataAnalysisFileStorageService;
|
private final DataAnalysisUploadService dataAnalysisUploadService;
|
private final RedisUtil redisUtil;
|
private final OperateLogService operateLogService;
|
|
@Override
|
public boolean isEnabled() {
|
String value = configService.getConfigValue(CONFIG_ENABLED, "0");
|
return "1".equals(value.trim());
|
}
|
|
@Override
|
public void setEnabled(boolean enabled) {
|
configService.saveConfigValue(CONFIG_ENABLED, enabled ? "1" : "0");
|
configService.refreshSystemConfigCache();
|
}
|
|
@Override
|
public DataAnalysisCoordinatorResult runAnalysisIfEligible() {
|
if (!isEnabled()) {
|
return DataAnalysisCoordinatorResult.skipped("disabled");
|
}
|
|
String periods = configService.getConfigValue(CONFIG_PERIODS, "YESTERDAY");
|
List<String> periodList = Arrays.stream(periods.split(","))
|
.map(String::trim)
|
.filter(s -> !s.isEmpty())
|
.toList();
|
|
if (periodList.isEmpty()) {
|
return DataAnalysisCoordinatorResult.skipped("no_configured_periods");
|
}
|
|
// Run the first configured period
|
String periodType = periodList.get(0);
|
return runWithLock("auto", periodType);
|
}
|
|
@Override
|
public DataAnalysisCoordinatorResult runManualAnalysis(String periodType) {
|
if (!isEnabled()) {
|
return DataAnalysisCoordinatorResult.skipped("disabled");
|
}
|
return runWithLock("manual", periodType);
|
}
|
|
private DataAnalysisCoordinatorResult runWithLock(String triggerType, String periodType) {
|
String lockKey = RedisKeyType.AI_DATA_ANALYSIS_RUNNING_LOCK.key;
|
String lockToken = UUID.randomUUID().toString();
|
if (!redisUtil.trySetStringIfAbsent(lockKey, lockToken, RUNNING_LOCK_SECONDS)) {
|
return DataAnalysisCoordinatorResult.skipped("running_lock_not_acquired");
|
}
|
|
Date startTime = new Date();
|
DataAnalysisAgentService.DataAnalysisAgentResult agentResult = null;
|
try {
|
agentResult = dataAnalysisAgentService.runAnalysis(periodType);
|
saveReport(triggerType, periodType, startTime, agentResult);
|
safeWriteOperateLog(triggerType, periodType, agentResult);
|
return DataAnalysisCoordinatorResult.triggered(agentResult);
|
} catch (Exception exception) {
|
log.error("Data analysis coordinator failed to run agent", exception);
|
agentResult = failedAgentResult(periodType, exception);
|
saveReport(triggerType, periodType, startTime, agentResult);
|
safeWriteOperateLog(triggerType, periodType, agentResult);
|
return DataAnalysisCoordinatorResult.triggered(agentResult);
|
} finally {
|
redisUtil.compareAndDelete(lockKey, lockToken);
|
}
|
}
|
|
private void saveReport(String triggerType, String periodType, Date startTime,
|
DataAnalysisAgentService.DataAnalysisAgentResult agentResult) {
|
try {
|
AiDataAnalysisReport report = new AiDataAnalysisReport();
|
report.setPeriodType(periodType);
|
report.setPeriodStart(resolvePeriodStart(periodType));
|
report.setPeriodEnd(resolvePeriodEnd(periodType));
|
report.setTriggerType(triggerType);
|
report.setStatus(Boolean.TRUE.equals(agentResult.getSuccess()) ? "success" : "failed");
|
report.setSummary(agentResult.getSummary());
|
report.setStructuredData(agentResult.getMcpCalls() != null ? JSON.toJSONString(agentResult.getMcpCalls()) : null);
|
report.setLlmCallCount(agentResult.getLlmCallCount());
|
report.setPromptTokens(agentResult.getPromptTokens() != null ? agentResult.getPromptTokens().intValue() : 0);
|
report.setCompletionTokens(agentResult.getCompletionTokens() != null ? agentResult.getCompletionTokens().intValue() : 0);
|
report.setTotalTokens(agentResult.getTotalTokens() != null ? agentResult.getTotalTokens().intValue() : 0);
|
report.setCreateTime(startTime);
|
report.setFinishTime(new Date());
|
|
// Save to local file
|
String filePath = dataAnalysisFileStorageService.saveReport(report);
|
report.setLocalFilePath(filePath);
|
|
// Save to DB
|
aiDataAnalysisReportService.save(report);
|
|
// Try upload
|
DataAnalysisUploadService.UploadResult uploadResult = dataAnalysisUploadService.upload(report);
|
report.setUploadStatus(uploadResult.isSuccess() ? "uploaded" : (uploadResult.isSkipped() ? "skipped" : "failed"));
|
aiDataAnalysisReportService.updateById(report);
|
} catch (Exception e) {
|
log.error("Failed to save data analysis report", e);
|
}
|
}
|
|
private Date resolvePeriodStart(String periodType) {
|
// Simplified - the agent resolves the actual range
|
return new Date();
|
}
|
|
private Date resolvePeriodEnd(String periodType) {
|
return new Date();
|
}
|
|
private void safeWriteOperateLog(String triggerType, String periodType,
|
DataAnalysisAgentService.DataAnalysisAgentResult agentResult) {
|
try {
|
String memo = "AI数据分析 " + periodType + " " + triggerType
|
+ " 结果:" + (Boolean.TRUE.equals(agentResult.getSuccess()) ? "成功" : "失败");
|
OperateLog operateLog = new OperateLog();
|
operateLog.setUserId(SYSTEM_USER_ID);
|
operateLog.setAction("AI数据分析");
|
operateLog.setRequest(memo);
|
operateLog.setCreateTime(new Date());
|
operateLogService.save(operateLog);
|
} catch (Exception e) {
|
log.warn("Failed to write operate log for data analysis", e);
|
}
|
}
|
|
private DataAnalysisAgentService.DataAnalysisAgentResult failedAgentResult(String periodType, Exception exception) {
|
DataAnalysisAgentService.DataAnalysisAgentResult result = new DataAnalysisAgentService.DataAnalysisAgentResult();
|
result.setSuccess(false);
|
result.setPeriodType(periodType);
|
result.setTriggerType("agent");
|
result.setSummary("数据分析任务执行异常: " + exception.getMessage());
|
result.setToolCallCount(0);
|
result.setLlmCallCount(0);
|
result.setPromptTokens(0L);
|
result.setCompletionTokens(0L);
|
result.setTotalTokens(0L);
|
result.setMaxRoundsReached(false);
|
return result;
|
}
|
}
|