package com.zy.core.task; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.zy.asrs.domain.DevicePingSample; import com.zy.asrs.entity.DeviceConfig; import com.zy.asrs.service.DeviceConfigService; import com.zy.asrs.service.DevicePingFileStorageService; import jakarta.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @Component public class DevicePingScheduler { private static final Pattern LATENCY_PATTERN = Pattern.compile("(?:time|时间)\\s*[=<]?\\s*([0-9]+(?:[\\.,][0-9]+)?)\\s*(?:ms|毫秒)", Pattern.CASE_INSENSITIVE); private static final int OUTPUT_MESSAGE_LIMIT = 120; @Value("${devicePingStorage.enabled:true}") private boolean enabled; @Value("${devicePingStorage.timeoutMs:800}") private int timeoutMs; @Value("${devicePingStorage.probeCount:3}") private int probeCount; @Value("${devicePingStorage.maxParallel:8}") private int maxParallel; @Value("${devicePingStorage.packetSize:-1}") private int packetSize; @Autowired private DeviceConfigService deviceConfigService; @Autowired private DevicePingFileStorageService devicePingFileStorageService; private final AtomicBoolean running = new AtomicBoolean(false); private volatile ExecutorService executorService; private volatile ExecutorService probeExecutorService; private volatile long lastCleanupAt = 0L; @Scheduled(fixedDelayString = "${devicePingStorage.intervalMs:1000}") public void schedule() { if (!enabled) { return; } if (!running.compareAndSet(false, true)) { return; } ensureExecutor().submit(() -> { try { collectOnce(); } finally { running.set(false); } }); } private void collectOnce() { List configs = loadConfigs(); if (configs.isEmpty()) { maybeCleanup(); return; } List> futures = new ArrayList<>(); for (DeviceConfig config : configs) { futures.add(ensureProbeExecutor().submit(() -> probe(config))); } List samples = new ArrayList<>(); for (Future future : futures) { try { DevicePingSample sample = future.get(); if (sample != null) { samples.add(sample); } } catch (Exception ignored) { } } devicePingFileStorageService.appendSamples(samples); maybeCleanup(); } private List loadConfigs() { try { QueryWrapper wrapper = new QueryWrapper() .isNotNull("ip") .ne("ip", "") .orderBy(true, true, "device_type", "device_no"); return deviceConfigService.list(wrapper); } catch (Exception ignored) { return new ArrayList<>(); } } private DevicePingSample probe(DeviceConfig config) { DevicePingSample sample = new DevicePingSample(); sample.setCreateTime(new Date()); sample.setDeviceType(config.getDeviceType()); sample.setDeviceNo(config.getDeviceNo()); sample.setIp(config.getIp()); sample.setPort(config.getPort()); sample.setPacketSize(packetSize); int actualProbeCount = Math.max(1, probeCount); sample.setProbeCount(actualProbeCount); List successLatencies = new ArrayList<>(); String lastError = ""; int successCount = 0; int perProbeTimeoutMs = Math.max(100, timeoutMs / actualProbeCount); for (int i = 0; i < actualProbeCount; i++) { try { PingResult pingResult = systemPing(config.getIp(), perProbeTimeoutMs); if (pingResult.success) { successLatencies.add(pingResult.latencyMs); successCount++; } else { lastError = "第" + (i + 1) + "次探测失败"; if (!Cools.isEmpty(pingResult.message)) { lastError = lastError + "," + pingResult.message; } } } catch (Exception ex) { lastError = Cools.isEmpty(ex.getMessage()) ? ex.getClass().getSimpleName() : ex.getMessage(); } } sample.setSuccessProbeCount(successCount); sample.setReachable(successCount > 0); if (!successLatencies.isEmpty()) { long minLatency = Long.MAX_VALUE; long maxLatency = Long.MIN_VALUE; long latencySum = 0L; for (Long latency : successLatencies) { if (latency == null) { continue; } latencySum += latency; if (latency < minLatency) { minLatency = latency; } if (latency > maxLatency) { maxLatency = latency; } } long avgLatency = Math.round(latencySum * 1.0D / successLatencies.size()); sample.setLatencyMs(avgLatency); sample.setAvgLatencyMs(avgLatency); sample.setMinLatencyMs(minLatency); sample.setMaxLatencyMs(maxLatency); } else { sample.setLatencyMs(null); sample.setAvgLatencyMs(null); sample.setMinLatencyMs(null); sample.setMaxLatencyMs(null); } if (successCount == actualProbeCount) { sample.setStatus("OK"); sample.setMessage(actualProbeCount + "/" + actualProbeCount + " 次探测成功"); } else if (successCount > 0) { sample.setStatus("UNSTABLE"); sample.setMessage(successCount + "/" + actualProbeCount + " 次探测成功" + (Cools.isEmpty(lastError) ? "" : "," + lastError)); } else { sample.setStatus("TIMEOUT"); sample.setMessage(Cools.isEmpty(lastError) ? "全部探测均超时" : lastError); } return sample; } private PingResult systemPing(String ip, int timeoutMs) throws Exception { List command = buildPingCommand(ip, timeoutMs); ProcessBuilder processBuilder = new ProcessBuilder(command); processBuilder.redirectErrorStream(true); if (!isWindows()) { processBuilder.environment().put("LC_ALL", "C"); processBuilder.environment().put("LANG", "C"); } long start = System.nanoTime(); Process process = processBuilder.start(); String output; try (InputStream inputStream = process.getInputStream()) { boolean finished = process.waitFor(Math.max(1L, timeoutMs + 500L), TimeUnit.MILLISECONDS); if (!finished) { process.destroyForcibly(); return new PingResult(false, null, "超时"); } output = readFully(inputStream); } int exitCode = process.exitValue(); Long latencyMs = parseLatencyMs(output); if (exitCode == 0) { if (latencyMs == null) { latencyMs = Math.max(0L, (System.nanoTime() - start) / 1_000_000L); } return new PingResult(true, latencyMs, "成功"); } return new PingResult(false, null, extractFailureMessage(output, exitCode)); } private List buildPingCommand(String ip, int timeoutMs) { int actualPacketSize = Math.max(-1, packetSize); if (isWindows()) { List command = new ArrayList<>(Arrays.asList("ping", "-n", "1", "-w", String.valueOf(Math.max(1, timeoutMs)))); if (actualPacketSize >= 0) { command.add("-l"); command.add(String.valueOf(actualPacketSize)); } command.add(ip); return command; } if (isMac()) { List command = new ArrayList<>(Arrays.asList("ping", "-n", "-c", "1", "-W", String.valueOf(Math.max(1, timeoutMs)))); if (actualPacketSize >= 0) { command.add("-s"); command.add(String.valueOf(actualPacketSize)); } command.add(ip); return command; } int timeoutSec = Math.max(1, (int) Math.ceil(timeoutMs / 1000.0D)); List command = new ArrayList<>(Arrays.asList("ping", "-n", "-c", "1", "-W", String.valueOf(timeoutSec))); if (actualPacketSize >= 0) { command.add("-s"); command.add(String.valueOf(actualPacketSize)); } command.add(ip); return command; } private boolean isWindows() { return System.getProperty("os.name", "").toLowerCase().startsWith("windows"); } private boolean isMac() { String osName = System.getProperty("os.name", "").toLowerCase(); return osName.startsWith("mac") || osName.startsWith("darwin"); } private Long parseLatencyMs(String output) { if (Cools.isEmpty(output)) { return null; } Matcher matcher = LATENCY_PATTERN.matcher(output.replace(',', '.')); if (matcher.find()) { try { return Math.round(Double.parseDouble(matcher.group(1))); } catch (Exception ignored) { return null; } } return null; } private String extractFailureMessage(String output, int exitCode) { if (Cools.isEmpty(output)) { return "exit=" + exitCode; } String[] lines = output.split("\\R"); for (String line : lines) { String value = line == null ? "" : line.trim(); if (value.isEmpty()) { continue; } String lower = value.toLowerCase(); if (lower.contains("request timeout") || lower.contains("100.0% packet loss") || lower.contains("100% packet loss") || lower.contains("destination host unreachable") || lower.contains("could not find host") || lower.contains("name or service not known") || lower.contains("temporary failure in name resolution") || value.contains("请求超时") || value.contains("找不到主机") || value.contains("无法访问目标主机") || value.contains("100.0% 丢失")) { return trimMessage(value); } } for (String line : lines) { String value = line == null ? "" : line.trim(); if (!value.isEmpty()) { return trimMessage(value); } } return "exit=" + exitCode; } private String trimMessage(String message) { if (Cools.isEmpty(message)) { return ""; } String value = message.trim(); if (value.length() > OUTPUT_MESSAGE_LIMIT) { return value.substring(0, OUTPUT_MESSAGE_LIMIT); } return value; } private String readFully(InputStream inputStream) throws Exception { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; int length; while ((length = inputStream.read(buffer)) != -1) { outputStream.write(buffer, 0, length); } return outputStream.toString(Charset.defaultCharset()); } private ExecutorService ensureExecutor() { ExecutorService current = executorService; if (current != null && !current.isShutdown()) { return current; } synchronized (this) { current = executorService; if (current == null || current.isShutdown()) { executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("device-ping-schedule-")); } return executorService; } } private ExecutorService ensureProbeExecutor() { ExecutorService current = probeExecutorService; if (current != null && !current.isShutdown()) { return current; } synchronized (this) { current = probeExecutorService; if (current == null || current.isShutdown()) { probeExecutorService = Executors.newFixedThreadPool(Math.max(1, maxParallel), new NamedThreadFactory("device-ping-probe-")); } return probeExecutorService; } } private void maybeCleanup() { long now = System.currentTimeMillis(); if (now - lastCleanupAt < 60L * 60L * 1000L) { return; } lastCleanupAt = now; devicePingFileStorageService.cleanupExpired(); } @PreDestroy public void shutdown() { ExecutorService current = executorService; if (current != null) { current.shutdownNow(); } ExecutorService probeCurrent = probeExecutorService; if (probeCurrent != null) { probeCurrent.shutdownNow(); } } private static class NamedThreadFactory implements ThreadFactory { private final String prefix; private final AtomicInteger index = new AtomicInteger(1); private NamedThreadFactory(String prefix) { this.prefix = prefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, prefix + index.getAndIncrement()); thread.setDaemon(true); return thread; } } private static class PingResult { private final boolean success; private final Long latencyMs; private final String message; private PingResult(boolean success, Long latencyMs, String message) { this.success = success; this.latencyMs = latencyMs; this.message = message; } } }