package com.zy.core.thread.impl.v5;
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.core.common.Cools;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.enums.RedisKeyType;
|
import com.zy.system.entity.Config;
|
import com.zy.system.service.ConfigService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import java.util.HashMap;
|
|
@Slf4j
|
@Component
|
public class StationV5RuntimeConfigProvider {
|
|
private static final String CFG_SEGMENT_ADVANCE_RATIO = "stationCommandSegmentAdvanceRatio";
|
private static final String CFG_SEGMENT_EXECUTOR_POOL_SIZE = "stationV5SegmentExecutorPoolSize";
|
private static final String CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY = "stationV5SegmentExecutorQueueCapacity";
|
private static final String CFG_CONFIG_REFRESH_SECONDS = "stationCommandConfigRefreshSeconds";
|
|
private static final double DEFAULT_SEGMENT_ADVANCE_RATIO = 0.3d;
|
private static final int DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE = 128;
|
private static final int DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY = 512;
|
private static final int DEFAULT_CONFIG_REFRESH_SECONDS = 30;
|
|
@Autowired(required = false)
|
private RedisUtil redisUtil;
|
@Autowired(required = false)
|
private ConfigService configService;
|
|
private volatile CacheSnapshot cacheSnapshot = new CacheSnapshot(
|
DEFAULT_SEGMENT_ADVANCE_RATIO,
|
DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE,
|
DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY,
|
DEFAULT_CONFIG_REFRESH_SECONDS,
|
0L
|
);
|
|
public double getSegmentAdvanceRatio() {
|
return loadSnapshot().segmentAdvanceRatio;
|
}
|
|
public int getSegmentExecutorPoolSize() {
|
return loadSnapshot().segmentExecutorPoolSize;
|
}
|
|
public int getSegmentExecutorQueueCapacity() {
|
return loadSnapshot().segmentExecutorQueueCapacity;
|
}
|
|
private CacheSnapshot loadSnapshot() {
|
CacheSnapshot snapshot = cacheSnapshot;
|
long now = System.currentTimeMillis();
|
if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
|
return snapshot;
|
}
|
synchronized (this) {
|
snapshot = cacheSnapshot;
|
now = System.currentTimeMillis();
|
if (now - snapshot.loadedAtMs <= snapshot.refreshSeconds * 1000L) {
|
return snapshot;
|
}
|
CacheSnapshot refreshed = refreshSnapshot(now);
|
cacheSnapshot = refreshed;
|
return refreshed;
|
}
|
}
|
|
@SuppressWarnings("unchecked")
|
private CacheSnapshot refreshSnapshot(long now) {
|
HashMap<String, String> systemConfigMap = null;
|
try {
|
Object systemConfigMapObj = redisUtil == null ? null : redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
|
if (systemConfigMapObj instanceof HashMap) {
|
systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
|
}
|
} catch (Exception e) {
|
log.warn("加载 V5 输送运行时配置缓存失败,fallback to db. reason=redis-read-error", e);
|
}
|
|
int refreshSeconds = normalizeRefreshSeconds(readConfigText(systemConfigMap, CFG_CONFIG_REFRESH_SECONDS));
|
double segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigText(systemConfigMap, CFG_SEGMENT_ADVANCE_RATIO));
|
int poolSize = normalizePoolSize(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_POOL_SIZE));
|
int queueCapacity = normalizeQueueCapacity(readConfigText(systemConfigMap, CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY));
|
|
if (systemConfigMap == null) {
|
segmentAdvanceRatio = normalizeSegmentAdvanceRatio(readConfigTextFromDb(CFG_SEGMENT_ADVANCE_RATIO), segmentAdvanceRatio);
|
poolSize = normalizePoolSize(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_POOL_SIZE), poolSize);
|
queueCapacity = normalizeQueueCapacity(readConfigTextFromDb(CFG_SEGMENT_EXECUTOR_QUEUE_CAPACITY), queueCapacity);
|
refreshSeconds = normalizeRefreshSeconds(readConfigTextFromDb(CFG_CONFIG_REFRESH_SECONDS), refreshSeconds);
|
}
|
return new CacheSnapshot(segmentAdvanceRatio, poolSize, queueCapacity, refreshSeconds, now);
|
}
|
|
private String readConfigText(HashMap<String, String> systemConfigMap, String code) {
|
if (systemConfigMap == null || code == null) {
|
return null;
|
}
|
return systemConfigMap.get(code);
|
}
|
|
private String readConfigTextFromDb(String code) {
|
if (configService == null || code == null) {
|
return null;
|
}
|
try {
|
Config config = configService.getOne(new QueryWrapper<Config>().eq("code", code));
|
return config == null ? null : config.getValue();
|
} catch (Exception e) {
|
log.warn("加载 V5 输送运行时配置数据库失败,code={}", code, e);
|
return null;
|
}
|
}
|
|
private double normalizeSegmentAdvanceRatio(String valueText) {
|
return normalizeSegmentAdvanceRatio(valueText, DEFAULT_SEGMENT_ADVANCE_RATIO);
|
}
|
|
private double normalizeSegmentAdvanceRatio(String valueText, double defaultValue) {
|
if (valueText == null) {
|
return defaultValue;
|
}
|
String text = valueText.trim();
|
if (text.isEmpty()) {
|
return defaultValue;
|
}
|
if (text.endsWith("%")) {
|
text = text.substring(0, text.length() - 1).trim();
|
}
|
try {
|
double ratio = Double.parseDouble(text);
|
if (ratio > 1d && ratio <= 100d) {
|
ratio = ratio / 100d;
|
}
|
if (ratio < 0d) {
|
return 0d;
|
}
|
if (ratio > 1d) {
|
return 1d;
|
}
|
return ratio;
|
} catch (Exception ignore) {
|
return defaultValue;
|
}
|
}
|
|
private int normalizePoolSize(String valueText) {
|
return normalizePoolSize(valueText, DEFAULT_SEGMENT_EXECUTOR_POOL_SIZE);
|
}
|
|
private int normalizePoolSize(String valueText, int defaultValue) {
|
int configured = parsePositiveInt(valueText, defaultValue);
|
if (configured < 16) {
|
return 16;
|
}
|
return Math.min(configured, 512);
|
}
|
|
private int normalizeQueueCapacity(String valueText) {
|
return normalizeQueueCapacity(valueText, DEFAULT_SEGMENT_EXECUTOR_QUEUE_CAPACITY);
|
}
|
|
private int normalizeQueueCapacity(String valueText, int defaultValue) {
|
int configured = parsePositiveInt(valueText, defaultValue);
|
if (configured < 64) {
|
return 64;
|
}
|
return Math.min(configured, 4096);
|
}
|
|
private int normalizeRefreshSeconds(String valueText) {
|
return normalizeRefreshSeconds(valueText, DEFAULT_CONFIG_REFRESH_SECONDS);
|
}
|
|
private int normalizeRefreshSeconds(String valueText, int defaultValue) {
|
int configured = parsePositiveInt(valueText, defaultValue);
|
if (configured < 5) {
|
return 5;
|
}
|
return Math.min(configured, 300);
|
}
|
|
private int parsePositiveInt(String valueText, int defaultValue) {
|
if (Cools.isEmpty(valueText)) {
|
return defaultValue;
|
}
|
try {
|
return Integer.parseInt(valueText.trim());
|
} catch (Exception ignore) {
|
return defaultValue;
|
}
|
}
|
|
private static class CacheSnapshot {
|
private final double segmentAdvanceRatio;
|
private final int segmentExecutorPoolSize;
|
private final int segmentExecutorQueueCapacity;
|
private final int refreshSeconds;
|
private final long loadedAtMs;
|
|
private CacheSnapshot(double segmentAdvanceRatio,
|
int segmentExecutorPoolSize,
|
int segmentExecutorQueueCapacity,
|
int refreshSeconds,
|
long loadedAtMs) {
|
this.segmentAdvanceRatio = segmentAdvanceRatio;
|
this.segmentExecutorPoolSize = segmentExecutorPoolSize;
|
this.segmentExecutorQueueCapacity = segmentExecutorQueueCapacity;
|
this.refreshSeconds = refreshSeconds;
|
this.loadedAtMs = loadedAtMs;
|
}
|
}
|
}
|