package com.zy.core.task;
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.core.common.Cools;
|
import com.zy.asrs.domain.DevicePingSample;
|
import com.zy.asrs.entity.DeviceConfig;
|
import com.zy.asrs.service.DeviceConfigService;
|
import com.zy.asrs.service.DevicePingFileStorageService;
|
import jakarta.annotation.PreDestroy;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.io.ByteArrayOutputStream;
|
import java.io.InputStream;
|
import java.nio.charset.Charset;
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.Future;
|
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.regex.Matcher;
|
import java.util.regex.Pattern;
|
|
@Component
|
public class DevicePingScheduler {
|
|
private static final Pattern LATENCY_PATTERN = Pattern.compile("(?:time|时间)\\s*[=<]?\\s*([0-9]+(?:[\\.,][0-9]+)?)\\s*(?:ms|毫秒)", Pattern.CASE_INSENSITIVE);
|
private static final int OUTPUT_MESSAGE_LIMIT = 120;
|
|
@Value("${devicePingStorage.enabled:true}")
|
private boolean enabled;
|
|
@Value("${devicePingStorage.timeoutMs:800}")
|
private int timeoutMs;
|
|
@Value("${devicePingStorage.probeCount:3}")
|
private int probeCount;
|
|
@Value("${devicePingStorage.maxParallel:8}")
|
private int maxParallel;
|
|
@Value("${devicePingStorage.packetSize:-1}")
|
private int packetSize;
|
|
@Autowired
|
private DeviceConfigService deviceConfigService;
|
|
@Autowired
|
private DevicePingFileStorageService devicePingFileStorageService;
|
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
private volatile ExecutorService executorService;
|
private volatile ExecutorService probeExecutorService;
|
private volatile long lastCleanupAt = 0L;
|
|
@Scheduled(fixedDelayString = "${devicePingStorage.intervalMs:1000}")
|
public void schedule() {
|
if (!enabled) {
|
return;
|
}
|
if (!running.compareAndSet(false, true)) {
|
return;
|
}
|
ensureExecutor().submit(() -> {
|
try {
|
collectOnce();
|
} finally {
|
running.set(false);
|
}
|
});
|
}
|
|
private void collectOnce() {
|
List<DeviceConfig> configs = loadConfigs();
|
if (configs.isEmpty()) {
|
maybeCleanup();
|
return;
|
}
|
|
List<Future<DevicePingSample>> futures = new ArrayList<>();
|
for (DeviceConfig config : configs) {
|
futures.add(ensureProbeExecutor().submit(() -> probe(config)));
|
}
|
|
List<DevicePingSample> samples = new ArrayList<>();
|
for (Future<DevicePingSample> future : futures) {
|
try {
|
DevicePingSample sample = future.get();
|
if (sample != null) {
|
samples.add(sample);
|
}
|
} catch (Exception ignored) {
|
}
|
}
|
devicePingFileStorageService.appendSamples(samples);
|
maybeCleanup();
|
}
|
|
private List<DeviceConfig> loadConfigs() {
|
try {
|
QueryWrapper<DeviceConfig> wrapper = new QueryWrapper<DeviceConfig>()
|
.isNotNull("ip")
|
.ne("ip", "")
|
.orderBy(true, true, "device_type", "device_no");
|
return deviceConfigService.list(wrapper);
|
} catch (Exception ignored) {
|
return new ArrayList<>();
|
}
|
}
|
|
private DevicePingSample probe(DeviceConfig config) {
|
DevicePingSample sample = new DevicePingSample();
|
sample.setCreateTime(new Date());
|
sample.setDeviceType(config.getDeviceType());
|
sample.setDeviceNo(config.getDeviceNo());
|
sample.setIp(config.getIp());
|
sample.setPort(config.getPort());
|
sample.setPacketSize(packetSize);
|
int actualProbeCount = Math.max(1, probeCount);
|
sample.setProbeCount(actualProbeCount);
|
|
List<Long> successLatencies = new ArrayList<>();
|
String lastError = "";
|
int successCount = 0;
|
int perProbeTimeoutMs = Math.max(100, timeoutMs / actualProbeCount);
|
|
for (int i = 0; i < actualProbeCount; i++) {
|
try {
|
PingResult pingResult = systemPing(config.getIp(), perProbeTimeoutMs);
|
if (pingResult.success) {
|
successLatencies.add(pingResult.latencyMs);
|
successCount++;
|
} else {
|
lastError = "第" + (i + 1) + "次探测失败";
|
if (!Cools.isEmpty(pingResult.message)) {
|
lastError = lastError + "," + pingResult.message;
|
}
|
}
|
} catch (Exception ex) {
|
lastError = Cools.isEmpty(ex.getMessage()) ? ex.getClass().getSimpleName() : ex.getMessage();
|
}
|
}
|
|
sample.setSuccessProbeCount(successCount);
|
sample.setReachable(successCount > 0);
|
if (!successLatencies.isEmpty()) {
|
long minLatency = Long.MAX_VALUE;
|
long maxLatency = Long.MIN_VALUE;
|
long latencySum = 0L;
|
for (Long latency : successLatencies) {
|
if (latency == null) {
|
continue;
|
}
|
latencySum += latency;
|
if (latency < minLatency) {
|
minLatency = latency;
|
}
|
if (latency > maxLatency) {
|
maxLatency = latency;
|
}
|
}
|
long avgLatency = Math.round(latencySum * 1.0D / successLatencies.size());
|
sample.setLatencyMs(avgLatency);
|
sample.setAvgLatencyMs(avgLatency);
|
sample.setMinLatencyMs(minLatency);
|
sample.setMaxLatencyMs(maxLatency);
|
} else {
|
sample.setLatencyMs(null);
|
sample.setAvgLatencyMs(null);
|
sample.setMinLatencyMs(null);
|
sample.setMaxLatencyMs(null);
|
}
|
|
if (successCount == actualProbeCount) {
|
sample.setStatus("OK");
|
sample.setMessage(actualProbeCount + "/" + actualProbeCount + " 次探测成功");
|
} else if (successCount > 0) {
|
sample.setStatus("UNSTABLE");
|
sample.setMessage(successCount + "/" + actualProbeCount + " 次探测成功" + (Cools.isEmpty(lastError) ? "" : "," + lastError));
|
} else {
|
sample.setStatus("TIMEOUT");
|
sample.setMessage(Cools.isEmpty(lastError) ? "全部探测均超时" : lastError);
|
}
|
return sample;
|
}
|
|
private PingResult systemPing(String ip, int timeoutMs) throws Exception {
|
List<String> command = buildPingCommand(ip, timeoutMs);
|
ProcessBuilder processBuilder = new ProcessBuilder(command);
|
processBuilder.redirectErrorStream(true);
|
if (!isWindows()) {
|
processBuilder.environment().put("LC_ALL", "C");
|
processBuilder.environment().put("LANG", "C");
|
}
|
|
long start = System.nanoTime();
|
Process process = processBuilder.start();
|
String output;
|
try (InputStream inputStream = process.getInputStream()) {
|
boolean finished = process.waitFor(Math.max(1L, timeoutMs + 500L), TimeUnit.MILLISECONDS);
|
if (!finished) {
|
process.destroyForcibly();
|
return new PingResult(false, null, "超时");
|
}
|
output = readFully(inputStream);
|
}
|
|
int exitCode = process.exitValue();
|
Long latencyMs = parseLatencyMs(output);
|
if (exitCode == 0) {
|
if (latencyMs == null) {
|
latencyMs = Math.max(0L, (System.nanoTime() - start) / 1_000_000L);
|
}
|
return new PingResult(true, latencyMs, "成功");
|
}
|
return new PingResult(false, null, extractFailureMessage(output, exitCode));
|
}
|
|
private List<String> buildPingCommand(String ip, int timeoutMs) {
|
int actualPacketSize = Math.max(-1, packetSize);
|
if (isWindows()) {
|
List<String> command = new ArrayList<>(Arrays.asList("ping", "-n", "1", "-w", String.valueOf(Math.max(1, timeoutMs))));
|
if (actualPacketSize >= 0) {
|
command.add("-l");
|
command.add(String.valueOf(actualPacketSize));
|
}
|
command.add(ip);
|
return command;
|
}
|
if (isMac()) {
|
List<String> command = new ArrayList<>(Arrays.asList("ping", "-n", "-c", "1", "-W", String.valueOf(Math.max(1, timeoutMs))));
|
if (actualPacketSize >= 0) {
|
command.add("-s");
|
command.add(String.valueOf(actualPacketSize));
|
}
|
command.add(ip);
|
return command;
|
}
|
int timeoutSec = Math.max(1, (int) Math.ceil(timeoutMs / 1000.0D));
|
List<String> command = new ArrayList<>(Arrays.asList("ping", "-n", "-c", "1", "-W", String.valueOf(timeoutSec)));
|
if (actualPacketSize >= 0) {
|
command.add("-s");
|
command.add(String.valueOf(actualPacketSize));
|
}
|
command.add(ip);
|
return command;
|
}
|
|
private boolean isWindows() {
|
return System.getProperty("os.name", "").toLowerCase().startsWith("windows");
|
}
|
|
private boolean isMac() {
|
String osName = System.getProperty("os.name", "").toLowerCase();
|
return osName.startsWith("mac") || osName.startsWith("darwin");
|
}
|
|
private Long parseLatencyMs(String output) {
|
if (Cools.isEmpty(output)) {
|
return null;
|
}
|
Matcher matcher = LATENCY_PATTERN.matcher(output.replace(',', '.'));
|
if (matcher.find()) {
|
try {
|
return Math.round(Double.parseDouble(matcher.group(1)));
|
} catch (Exception ignored) {
|
return null;
|
}
|
}
|
return null;
|
}
|
|
private String extractFailureMessage(String output, int exitCode) {
|
if (Cools.isEmpty(output)) {
|
return "exit=" + exitCode;
|
}
|
String[] lines = output.split("\\R");
|
for (String line : lines) {
|
String value = line == null ? "" : line.trim();
|
if (value.isEmpty()) {
|
continue;
|
}
|
String lower = value.toLowerCase();
|
if (lower.contains("request timeout")
|
|| lower.contains("100.0% packet loss")
|
|| lower.contains("100% packet loss")
|
|| lower.contains("destination host unreachable")
|
|| lower.contains("could not find host")
|
|| lower.contains("name or service not known")
|
|| lower.contains("temporary failure in name resolution")
|
|| value.contains("请求超时")
|
|| value.contains("找不到主机")
|
|| value.contains("无法访问目标主机")
|
|| value.contains("100.0% 丢失")) {
|
return trimMessage(value);
|
}
|
}
|
for (String line : lines) {
|
String value = line == null ? "" : line.trim();
|
if (!value.isEmpty()) {
|
return trimMessage(value);
|
}
|
}
|
return "exit=" + exitCode;
|
}
|
|
private String trimMessage(String message) {
|
if (Cools.isEmpty(message)) {
|
return "";
|
}
|
String value = message.trim();
|
if (value.length() > OUTPUT_MESSAGE_LIMIT) {
|
return value.substring(0, OUTPUT_MESSAGE_LIMIT);
|
}
|
return value;
|
}
|
|
private String readFully(InputStream inputStream) throws Exception {
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
byte[] buffer = new byte[1024];
|
int length;
|
while ((length = inputStream.read(buffer)) != -1) {
|
outputStream.write(buffer, 0, length);
|
}
|
return outputStream.toString(Charset.defaultCharset());
|
}
|
|
private ExecutorService ensureExecutor() {
|
ExecutorService current = executorService;
|
if (current != null && !current.isShutdown()) {
|
return current;
|
}
|
synchronized (this) {
|
current = executorService;
|
if (current == null || current.isShutdown()) {
|
executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("device-ping-schedule-"));
|
}
|
return executorService;
|
}
|
}
|
|
private ExecutorService ensureProbeExecutor() {
|
ExecutorService current = probeExecutorService;
|
if (current != null && !current.isShutdown()) {
|
return current;
|
}
|
synchronized (this) {
|
current = probeExecutorService;
|
if (current == null || current.isShutdown()) {
|
probeExecutorService = Executors.newFixedThreadPool(Math.max(1, maxParallel), new NamedThreadFactory("device-ping-probe-"));
|
}
|
return probeExecutorService;
|
}
|
}
|
|
private void maybeCleanup() {
|
long now = System.currentTimeMillis();
|
if (now - lastCleanupAt < 60L * 60L * 1000L) {
|
return;
|
}
|
lastCleanupAt = now;
|
devicePingFileStorageService.cleanupExpired();
|
}
|
|
@PreDestroy
|
public void shutdown() {
|
ExecutorService current = executorService;
|
if (current != null) {
|
current.shutdownNow();
|
}
|
ExecutorService probeCurrent = probeExecutorService;
|
if (probeCurrent != null) {
|
probeCurrent.shutdownNow();
|
}
|
}
|
|
private static class NamedThreadFactory implements ThreadFactory {
|
|
private final String prefix;
|
private final AtomicInteger index = new AtomicInteger(1);
|
|
private NamedThreadFactory(String prefix) {
|
this.prefix = prefix;
|
}
|
|
@Override
|
public Thread newThread(Runnable runnable) {
|
Thread thread = new Thread(runnable, prefix + index.getAndIncrement());
|
thread.setDaemon(true);
|
return thread;
|
}
|
}
|
|
private static class PingResult {
|
|
private final boolean success;
|
private final Long latencyMs;
|
private final String message;
|
|
private PingResult(boolean success, Long latencyMs, String message) {
|
this.success = success;
|
this.latencyMs = latencyMs;
|
this.message = message;
|
}
|
}
|
}
|