package com.zy.core.thread.impl.v5; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.zy.common.utils.RedisUtil; import com.zy.core.enums.RedisKeyType; import com.zy.system.entity.Config; import com.zy.system.service.ConfigService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.HashMap; @Slf4j @Component public class StationV5RuntimeConfigProvider { private static final String CFG_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio"; private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize"; private static final String CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY = "stationV5SegmentExecutorQueueCapacity"; private static final String CFG_CONFIG_REFRESH_SECONDS = "stationCommandConfigRefreshSeconds"; private static final double DEFAULT_SEGMENT_ADVANCE_RATIO = 0.3d; private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128; private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512; private static final int DEFAULT_CONFIG_REFRESH_SECONDS = 30; @Autowired(required = false) private RedisUtil redisUtil; @Autowired(required = false) private ConfigService configService; private volatile CacheSnapshot cacheSnapshot = new CacheSnapshot( DEFAULT_SEGMENT_ADVANCE_RATIO, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY, DEFAULT_CONFIG_REFRESH_SECONDS, 0L ); public double getSegmentAdvanceRatio() { return loadSnapshot().segmentAdvanceRatio; } public int getSegmentExecutorPoolSize() { return loadSnapshot().segmentExecutorPoolSize; } public int getSegmentExecutorQueueCapacity() { return loadSnapshot().segmentExecutorQueueCapacity; } private CacheSnapshot loadSnapshot() { CacheSnapshot snapshot = cacheSnapshot; long now = System.currentTimeMillis(); if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) { return snapshot; } synchronized (this) { snapshot = cacheSnapshot; now = System.currentTimeMillis(); if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) { return snapshot; } CacheSnapshot refreshed = refreshSnapshot(now); cacheSnapshot = refreshed; return refreshed; } } @SuppressWarnings("unchecked") private CacheSnapshot refreshSnapshot(long now) { HashMap systemConfigMap = null; try { Object systemConfigMapObj = redisUtil == null ? null : redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); if (systemConfigMapObj instanceof HashMap) { systemConfigMap = (HashMap) systemConfigMapObj; } } catch (Exception e) { log.warn("加载 V5 输送运行时配置缓存失败,fallback to db. reason=redis-read-error", e); } int refreshSeconds = normalizeRefreshSeconds(readConfigText(systemConfigMap, CFG_CONFIG_REFRESH_SECONDS)); double segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigText(systemConfigMap, CFG_SEGMENT_ADVANCE_RATIO)); int poolSize = normalizePoolSize(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_POOL_SIZE)); int queueCapacity = normalizeQueueCapacity(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY)); if (systemConfigMap == null) { segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigTextFromDb(CFG_SEGMENT_ADVANCE_RATIO), segmentAdvanceRatio); poolSize = normalizePoolSize(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_POOL_SIZE), poolSize); queueCapacity = normalizeQueueCapacity(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY), queueCapacity); refreshSeconds = normalizeRefreshSeconds(readConfigTextFromDb(CFG_CONFIG_REFRESH_SECONDS), refreshSeconds); } return new CacheSnapshot(segmentAdvanceRatio, poolSize, queueCapacity, refreshSeconds, now); } private String readConfigText(HashMap systemConfigMap, String code) { if (systemConfigMap == null || code == null) { return null; } return systemConfigMap.get(code); } private String readConfigTextFromDb(String code) { if (configService == null || code == null) { return null; } try { Config config = configService.getOne(new QueryWrapper().eq("code", code)); return config == null ? null : config.getValue(); } catch (Exception e) { log.warn("加载 V5 输送运行时配置数据库失败,code={}", code, e); return null; } } private double normalizeSegmentAdvanceRatio(String valueText) { return normalizeSegmentAdvanceRatio(valueText, DEFAULT_SEGMENT_ADVANCE_RATIO); } private double normalizeSegmentAdvanceRatio(String valueText, double defaultValue) { if (valueText == null) { return defaultValue; } String text = valueText.trim(); if (text.isEmpty()) { return defaultValue; } if (text.endsWith("%")) { text = text.substring(0, text.length() - 1).trim(); } try { double ratio = Double.parseDouble(text); if (ratio > 1d && ratio <= 100d) { ratio = ratio / 100d; } if (ratio < 0d) { return 0d; } if (ratio > 1d) { return 1d; } return ratio; } catch (Exception ignore) { return defaultValue; } } private int normalizePoolSize(String valueText) { return normalizePoolSize(valueText, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE); } private int normalizePoolSize(String valueText, int defaultValue) { int configured = parsePositiveInt(valueText, defaultValue); if (configured < 16) { return 16; } return Math.min(configured, 512); } private int normalizeQueueCapacity(String valueText) { return normalizeQueueCapacity(valueText, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY); } private int normalizeQueueCapacity(String valueText, int defaultValue) { int configured = parsePositiveInt(valueText, defaultValue); if (configured < 64) { return 64; } return Math.min(configured, 4096); } private int normalizeRefreshSeconds(String valueText) { return normalizeRefreshSeconds(valueText, DEFAULT_CONFIG_REFRESH_SECONDS); } private int normalizeRefreshSeconds(String valueText, int defaultValue) { int configured = parsePositiveInt(valueText, defaultValue); if (configured < 5) { return 5; } return Math.min(configured, 300); } private int parsePositiveInt(String valueText, int defaultValue) { if (Cools.isEmpty(valueText)) { return defaultValue; } try { return Integer.parseInt(valueText.trim()); } catch (Exception ignore) { return defaultValue; } } private static class CacheSnapshot { private final double segmentAdvanceRatio; private final int segmentExecutorPoolSize; private final int segmentExecutorQueueCapacity; private final int refreshSeconds; private final long loadedAtMs; private CacheSnapshot(double segmentAdvanceRatio, int segmentExecutorPoolSize, int segmentExecutorQueueCapacity, int refreshSeconds, long loadedAtMs) { this.segmentAdvanceRatio = segmentAdvanceRatio; this.segmentExecutorPoolSize = segmentExecutorPoolSize; this.segmentExecutorQueueCapacity = segmentExecutorQueueCapacity; this.refreshSeconds = refreshSeconds; this.loadedAtMs = loadedAtMs; } } }