Junjie
2 天以前 852664df1caf38831793b341edcada9dd7b6c22a
src/main/java/com/zy/core/task/DeviceLogScheduler.java
@@ -1,47 +1,34 @@
package com.zy.core.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.zy.asrs.entity.DeviceDataLog;
import com.zy.asrs.service.DeviceDataLogService;
import com.zy.common.utils.RedisUtil;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.format.ResolverStyle;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
@Component
public class DeviceLogScheduler {
    private static final int BASE_BATCH_SIZE = 100;
    private static final int BACKLOG_SCAN_LIMIT = 2000;
    private static final int MAX_BATCH_SIZE = 1000;
    private static final ReentrantLock FILE_OP_LOCK = new ReentrantLock();
    private static final int DEFAULT_EXPIRE_DAYS = 1;
    private static final DateTimeFormatter DAY_FORMATTER = DateTimeFormatter.ofPattern("uuuuMMdd")
            .withResolverStyle(ResolverStyle.STRICT);
    @Value("${deviceLogStorage.type}")
    private String storageType;
@@ -49,212 +36,51 @@
    private String loggingPath;
    @Value("${deviceLogStorage.expireDays}")
    private Integer expireDays;
    @Autowired
    private DeviceDataLogService deviceDataLogService;
    @Autowired
    private RedisUtil redisUtil;
    private static final ReentrantLock FILE_OP_LOCK = new ReentrantLock();
    @Scheduled(cron = "0/3 * * * * ? ")
    public void delDeviceLog() {
        if ("mysql".equals(storageType)) {
            deviceDataLogService.clearLog(expireDays == null ? 1 : expireDays);
        } else if ("file".equals(storageType)) {
            if (!FILE_OP_LOCK.tryLock()) {
                return;
            }
            try {
                clearFileLog(expireDays == null ? 1 : expireDays);
            } finally {
                FILE_OP_LOCK.unlock();
            }
        } else {
            log.error("未定义的存储类型:{}", storageType);
    @PostConstruct
    public void validateStorageMode() {
        if (!"file".equalsIgnoreCase(storageType)) {
            throw new IllegalStateException("deviceLogStorage.type 仅支持 file,当前配置为: " + storageType);
        }
        ensureLoggingDirectoryExistsOrThrow();
    }
    @Scheduled(cron = "0/3 * * * * ? ")
    public void execute() {
        Set<String> scannedKeys = redisUtil.scanKeys(RedisKeyType.DEVICE_LOG_KEY.key, BACKLOG_SCAN_LIMIT);
        if (scannedKeys == null || scannedKeys.isEmpty()) {
    @Scheduled(cron = "${deviceLogStorage.cleanupScanCron:0 0 3 * * ?}")
    public void delDeviceLog() {
        if (!FILE_OP_LOCK.tryLock()) {
            return;
        }
        Set<String> keys = selectBatchKeys(scannedKeys);
        List<Object> values = redisUtil.multiGet(keys);
        List<DeviceDataLog> list = new ArrayList<>();
        for (Object object : values) {
            if (object instanceof DeviceDataLog) {
                list.add((DeviceDataLog) object);
            }
        }
        if (!list.isEmpty()) {
            if ("mysql".equals(storageType)) {
                mysqlSave(keys, list);
            } else if ("file".equals(storageType)) {
                if (!FILE_OP_LOCK.tryLock()) {
                    return;
                }
                try {
                    fileSave(keys, list);
                } finally {
                    FILE_OP_LOCK.unlock();
                }
            } else {
                log.error("未定义的存储类型:{}", storageType);
            }
        }
    }
    private Set<String> selectBatchKeys(Set<String> scannedKeys) {
        int backlog = scannedKeys.size();
        int batchSize = resolveBatchSize(backlog);
        if (backlog <= batchSize) {
            return scannedKeys;
        }
        LinkedHashSet<String> selected = new LinkedHashSet<>();
        for (String key : scannedKeys) {
            selected.add(key);
            if (selected.size() >= batchSize) {
                break;
            }
        }
        return selected;
    }
    private int resolveBatchSize(int backlog) {
        if (backlog <= BASE_BATCH_SIZE) {
            return backlog;
        }
        int adaptive = Math.max(BASE_BATCH_SIZE, backlog / 2);
        int rounded = ((adaptive + BASE_BATCH_SIZE - 1) / BASE_BATCH_SIZE) * BASE_BATCH_SIZE;
        return Math.min(MAX_BATCH_SIZE, rounded);
    }
    private void mysqlSave(Set<String> keys, List<DeviceDataLog> list) {
        if (deviceDataLogService.saveBatch(list)) {
            redisUtil.del(keys.toArray(new String[0]));
        }
    }
    private void fileSave(Set<String> keys, List<DeviceDataLog> list) {
        try {
            Path baseDir = Paths.get(loggingPath);
            Files.createDirectories(baseDir);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
            Map<String, Map<String, List<DeviceDataLog>>> group = new HashMap<>();
            for (DeviceDataLog logItem : list) {
                String datePart = sdf.format(logItem.getCreateTime() == null ? new Date() : logItem.getCreateTime());
                String prefix = buildFilePrefix(logItem, datePart);
                if (prefix == null) {
                    continue;
                }
                group.computeIfAbsent(datePart, k -> new HashMap<>())
                        .computeIfAbsent(prefix, k -> new ArrayList<>())
                        .add(logItem);
            }
            for (Map.Entry<String, Map<String, List<DeviceDataLog>>> dateEntry : group.entrySet()) {
                Path dayDir = baseDir.resolve(dateEntry.getKey());
                Files.createDirectories(dayDir);
                for (Map.Entry<String, List<DeviceDataLog>> entry : dateEntry.getValue().entrySet()) {
                    List<DeviceDataLog> logs = entry.getValue();
                    if (logs == null || logs.isEmpty()) {
                        continue;
                    }
                    DeviceDataLog firstLog = logs.get(0);
                    Path deviceDir = resolveDeviceDir(dayDir, firstLog);
                    if (deviceDir == null) {
                        continue;
                    }
                    Files.createDirectories(deviceDir);
                    String prefix = buildFilePrefix(firstLog, dateEntry.getKey());
                    if (prefix == null) {
                        continue;
                    }
                    logs.sort(Comparator.comparing(DeviceDataLog::getCreateTime, Comparator.nullsLast(Date::compareTo)));
                    int index = findStartIndex(deviceDir, prefix);
                    Path current = deviceDir.resolve(prefix + index + ".log");
                    if (!Files.exists(current)) {
                        Files.createFile(current);
                    }
                    long size = Files.size(current);
                    long max = 1024L * 1024L;
                    for (DeviceDataLog d : logs) {
                        String json = JSON.toJSONStringWithDateFormat(d, "yyyy-MM-dd HH:mm:ss.SSS", SerializerFeature.WriteDateUseDateFormat);
                        byte[] line = (json + System.lineSeparator()).getBytes(StandardCharsets.UTF_8);
                        if (size + line.length > max) {
                            index++;
                            current = deviceDir.resolve(prefix + index + ".log");
                            if (!Files.exists(current)) {
                                Files.createFile(current);
                            }
                            size = 0;
                        }
                        Files.write(current, line, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
                        size += line.length;
                    }
                }
            }
            redisUtil.del(keys.toArray(new String[0]));
            ensureLoggingDirectoryExists();
            clearFileLog(resolveExpireDays());
        } finally {
            FILE_OP_LOCK.unlock();
        }
    }
    private void ensureLoggingDirectoryExistsOrThrow() {
        Path loggingRootPath = Paths.get(loggingPath);
        try {
            Files.createDirectories(loggingRootPath);
        } catch (Exception e) {
            log.error("设备日志文件存储失败", e);
            throw new IllegalStateException("初始化设备日志目录失败, path=" + loggingPath, e);
        }
    }
    private String buildFilePrefix(DeviceDataLog logItem, String datePart) {
        if (logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) {
            return null;
    private void ensureLoggingDirectoryExists() {
        try {
            Files.createDirectories(Paths.get(loggingPath));
        } catch (Exception e) {
            log.warn("初始化设备日志目录失败, path={}", loggingPath, e);
        }
        if (String.valueOf(SlaveType.Devp).equals(logItem.getType())) {
            if (logItem.getStationId() == null) {
                log.warn("跳过缺少站点号的输送设备日志, deviceNo={}, createTime={}", logItem.getDeviceNo(), logItem.getCreateTime());
                return null;
            }
            return logItem.getType() + "_" + logItem.getDeviceNo() + "_station_" + logItem.getStationId() + "_" + datePart + "_";
        }
        return logItem.getType() + "_" + logItem.getDeviceNo() + "_" + datePart + "_";
    }
    private Path resolveDeviceDir(Path dayDir, DeviceDataLog logItem) {
        if (dayDir == null || logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) {
            return null;
    private int resolveExpireDays() {
        if (expireDays == null || expireDays <= 0) {
            log.warn("deviceLogStorage.expireDays 配置无效,使用默认值: {}", DEFAULT_EXPIRE_DAYS);
            return DEFAULT_EXPIRE_DAYS;
        }
        return dayDir.resolve(logItem.getType()).resolve(String.valueOf(logItem.getDeviceNo()));
    }
    private int findStartIndex(Path baseDir, String prefix) throws Exception {
        List<Path> matched;
        try (Stream<Path> stream = Files.list(baseDir)) {
            matched = stream
                    .filter(p -> {
                        String n = p.getFileName().toString();
                        return n.startsWith(prefix) && n.endsWith(".log");
                    })
                    .collect(Collectors.toList());
        }
        int maxIdx = 0;
        for (Path p : matched) {
            String name = p.getFileName().toString();
            String suf = name.substring(prefix.length());
            if (!suf.isEmpty()) {
                try {
                    int val = Integer.parseInt(suf.replace(".log", ""));
                    if (val > maxIdx) {
                        maxIdx = val;
                    }
                } catch (NumberFormatException ignored) {
                }
            }
        }
        int candidate = maxIdx == 0 ? 1 : maxIdx;
        Path path = baseDir.resolve(prefix + candidate + ".log");
        if (Files.exists(path)) {
            long size = Files.size(path);
            if (size >= 1024L * 1024L) {
                return candidate + 1;
            }
        }
        return candidate;
        return expireDays;
    }
    private void clearFileLog(int days) {
@@ -263,42 +89,45 @@
            if (!Files.exists(baseDir)) {
                return;
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
            long cutoff = System.currentTimeMillis() - (long) days * 24 * 60 * 60 * 1000;
            List<Path> dirs;
            LocalDate earliestReservedDay = LocalDate.now().minusDays(days - 1L);
            List<Path> expiredDayDirs = new ArrayList<>();
            try (Stream<Path> stream = Files.list(baseDir)) {
                dirs = stream.filter(Files::isDirectory).collect(Collectors.toList());
                stream.filter(Files::isDirectory)
                        .forEach(dayDir -> collectExpiredDayDirectory(dayDir, earliestReservedDay, expiredDayDirs));
            }
            for (Path dir : dirs) {
                String name = dir.getFileName().toString();
                if (name.length() == 8 && name.chars().allMatch(Character::isDigit)) {
                    Date d = sdf.parse(name);
                    if (d.getTime() < cutoff) {
                        Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
                            @Override
                            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
                                try {
                                    Files.deleteIfExists(file);
                                } catch (Exception ignored) {
                                }
                                return FileVisitResult.CONTINUE;
                            }
                            @Override
                            public FileVisitResult postVisitDirectory(Path dir, java.io.IOException exc) {
                                try {
                                    Files.deleteIfExists(dir);
                                } catch (Exception ignored) {
                                }
                                return FileVisitResult.CONTINUE;
                            }
                        });
            for (Path dayDir : expiredDayDirs) {
                Files.walkFileTree(dayDir, new SimpleFileVisitor<>() {
                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws java.io.IOException {
                        Files.deleteIfExists(file);
                        return FileVisitResult.CONTINUE;
                    }
                }
                    @Override
                    public FileVisitResult postVisitDirectory(Path dir, java.io.IOException exc) throws java.io.IOException {
                        Files.deleteIfExists(dir);
                        return FileVisitResult.CONTINUE;
                    }
                });
            }
        } catch (Exception e) {
            log.error("设备日志文件清理失败", e);
            log.error("删除设备日志文件失败, path={}", loggingPath, e);
        }
    }
    private void collectExpiredDayDirectory(Path dayDir, LocalDate earliestReservedDay, List<Path> expiredDayDirs) {
        LocalDate dayValue = parseDayDirectory(dayDir.getFileName().toString());
        if (dayValue == null || !dayValue.isBefore(earliestReservedDay)) {
            return;
        }
        expiredDayDirs.add(dayDir);
    }
    private LocalDate parseDayDirectory(String dayDirectoryName) {
        try {
            return LocalDate.parse(dayDirectoryName, DAY_FORMATTER);
        } catch (DateTimeParseException ignored) {
            return null;
        }
    }
}