Junjie
9 天以前 1c7abc2dd0e4a4f2879af4eb00fb7eb4346fbf07
#后端增加websocket
3个文件已添加
2个文件已修改
703 ■■■■■ 已修改文件
src/main/java/com/zy/asrs/service/TvDataPushService.java 194 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/asrs/websocket/TvWebSocketServer.java 186 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/common/config/AdminInterceptor.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/common/config/WebSocketConfig.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/webapp/views/wsTest/wsTest.html 319 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/asrs/service/TvDataPushService.java
New file
@@ -0,0 +1,194 @@
package com.zy.asrs.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.common.Cools;
import com.zy.system.entity.Announcement;
import com.zy.asrs.entity.BasStationTv;
import com.zy.asrs.entity.TvDevice;
import com.zy.asrs.entity.dto.TvDataDto;
import com.zy.asrs.entity.dto.WcsStationDto;
import com.zy.asrs.enums.RedisKeyType;
import com.zy.asrs.utils.StationUtils;
import com.zy.asrs.websocket.TvWebSocketServer;
import com.zy.common.model.WebSocketMessage;
import com.zy.common.utils.RedisUtil;
import com.zy.system.service.AnnouncementService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.*;
@Slf4j
@Service
public class TvDataPushService {
    @Autowired
    private TvWebSocketServer tvWebSocketServer;
    @Autowired
    private TvDeviceService tvDeviceService;
    @Autowired
    private BasStationTvService basStationTvService;
    @Autowired
    private StationUtils stationUtils;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private AnnouncementService announcementService;
    private static final String[] WEEK = {"星期日","星期一","星期二","星期三","星期四","星期五","星期六"};
    @Scheduled(fixedRate = 1000)
    public void pushAll() {
        if (tvWebSocketServer.getOnlineIps().isEmpty()) {
            return;
        }
        pushLedInfosAndError();
        pushLocData();
        pushLineCharts();
        pushLocDetlStatistics();
        pushAnnouncement();
        pushDate();
    }
    private void pushLedInfosAndError() {
        String manualError = "";
        Object manualErrorObj = redisUtil.get(RedisKeyType.TV_MANUAL_ERROR_MSG.key);
        if (manualErrorObj != null) {
            manualError = String.valueOf(manualErrorObj);
        }
        for (String ip : tvWebSocketServer.getOnlineIps()) {
            TvDevice tvDevice = tvDeviceService.getOne(
                    new QueryWrapper<TvDevice>().eq("ip", ip));
            if (tvDevice == null) {
                // 非注册设备(如调试页面),推送空数据和通用错误
                Map<String, Object> ledData = new HashMap<>();
                ledData.put("deviceName", "未注册设备(" + ip + ")");
                ledData.put("data", new ArrayList<>());
                tvWebSocketServer.sendMessageToDevice(ip, buildMessage("ledInfos", ledData));
                Map<String, Object> errorData = new HashMap<>();
                errorData.put("errorMsg", manualError != null ? manualError : "");
                tvWebSocketServer.sendMessageToDevice(ip, buildMessage("error", errorData));
                continue;
            }
            List<BasStationTv> relations = basStationTvService
                    .list(new QueryWrapper<BasStationTv>().eq("tv_id", tvDevice.getId()));
            List<TvDataDto> dataList = new ArrayList<>();
            Set<String> errors = new LinkedHashSet<>();
            if (manualError != null && !manualError.isEmpty()) {
                errors.add(manualError);
            }
            if (relations != null && !relations.isEmpty()) {
                for (BasStationTv relation : relations) {
                    WcsStationDto wcsStationDto = stationUtils.stationMap.get(relation.getStationId());
                    if (wcsStationDto == null || wcsStationDto.getLoading() != 1) {
                        continue;
                    }
                    String errorMsg = "";
                    if (!Cools.isEmpty(wcsStationDto.getErrorMsg())) {
                        errorMsg += wcsStationDto.getErrorMsg();
                    }
                    if (!Cools.isEmpty(wcsStationDto.getSystemWarning())) {
                        errorMsg += wcsStationDto.getSystemWarning();
                    }
                    if (!Cools.isEmpty(errorMsg)) {
                        errors.add(errorMsg);
                    }
                    TvDataDto tvDataDto = new TvDataDto();
                    tvDataDto.setStationId(wcsStationDto.getStationId());
                    tvDataDto.setTaskNo(wcsStationDto.getTaskNo());
                    tvDataDto.setBarcode(wcsStationDto.getBarcode());
                    tvDataDto.setErrorMsg(errorMsg);
                    tvDataDto.setIoType(wcsStationDto.getIoType());
                    tvDataDto.setWrkDetls(wcsStationDto.getWrkDetls());
                    tvDataDto.setErrorCode(Cools.isEmpty(errorMsg) ? 0 : 1);
                    dataList.add(tvDataDto);
                }
            }
            // 推送 ledInfos
            Map<String, Object> ledData = new HashMap<>();
            ledData.put("deviceName", tvDevice.getName());
            ledData.put("data", dataList);
            tvWebSocketServer.sendMessageToDevice(ip, buildMessage("ledInfos", ledData));
            // 推送 error
            Map<String, Object> errorData = new HashMap<>();
            errorData.put("errorMsg", String.join(";", errors));
            tvWebSocketServer.sendMessageToDevice(ip, buildMessage("error", errorData));
        }
    }
    private void pushLocData() {
        Object o = redisUtil.get(RedisKeyType.TV_LOC_DATA_DTO.key);
        String currentData = o != null ? o.toString() : "{}";
        tvWebSocketServer.sendMessageToAll(buildMessage("locData", JSON.parseObject(currentData)));
    }
    private void pushLineCharts() {
        Object o = redisUtil.get(RedisKeyType.TV_LINE_CHARTS.key);
        if (o == null) {
            return;
        }
        String currentData = o.toString();
        tvWebSocketServer.sendMessageToAll(buildMessage("lineCharts", JSON.parseObject(currentData)));
    }
    private void pushLocDetlStatistics() {
        Object o = redisUtil.get(RedisKeyType.TV_LOC_DETL_STATISTICS.key);
        if (o == null) {
            return;
        }
        String currentData = o.toString();
        tvWebSocketServer.sendMessageToAll(buildMessage("locDetlStatistics", JSON.parseArray(currentData)));
    }
    private void pushAnnouncement() {
        QueryWrapper<Announcement> wrapper = new QueryWrapper<>();
        wrapper.eq("status", 1);
        wrapper.orderBy(true, false, "create_time");
        wrapper.last("limit 5");
        List<Announcement> list = announcementService.list(wrapper);
        tvWebSocketServer.sendMessageToAll(buildMessage("announcement", list));
    }
    private void pushDate() {
        Date now = new Date();
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(now);
        Map<String, Object> dateMap = new HashMap<>();
        dateMap.put("year", calendar.get(Calendar.YEAR));
        dateMap.put("month", zerofill(String.valueOf(calendar.get(Calendar.MONTH) + 1), 2));
        dateMap.put("day", zerofill(String.valueOf(calendar.get(Calendar.DATE)), 2));
        dateMap.put("hour", zerofill(String.valueOf(calendar.get(Calendar.HOUR_OF_DAY)), 2));
        dateMap.put("minute", zerofill(String.valueOf(calendar.get(Calendar.MINUTE)), 2));
        dateMap.put("second", zerofill(String.valueOf(calendar.get(Calendar.SECOND)), 2));
        dateMap.put("week", WEEK[calendar.get(Calendar.DAY_OF_WEEK) - 1]);
        tvWebSocketServer.sendMessageToAll(buildMessage("date", dateMap));
    }
    private String buildMessage(String url, Object data) {
        WebSocketMessage message = new WebSocketMessage();
        message.setUrl(url);
        message.setData(JSON.toJSONString(data));
        return JSON.toJSONString(message);
    }
    private String zerofill(String str, int length) {
        while (str.length() < length) {
            str = "0" + str;
        }
        return str;
    }
}
src/main/java/com/zy/asrs/websocket/TvWebSocketServer.java
New file
@@ -0,0 +1,186 @@
package com.zy.asrs.websocket;
import jakarta.websocket.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.websocket.server.HandshakeRequest;
import jakarta.websocket.server.ServerEndpoint;
import jakarta.websocket.server.ServerEndpointConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.apache.tomcat.websocket.server.WsHandshakeRequest;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint(value = "/tv/socket", configurator = TvWebSocketServer.TvConfigurator.class)
@Component
public class TvWebSocketServer {
    private static final Logger log = LoggerFactory.getLogger(TvWebSocketServer.class);
    private static final ConcurrentHashMap<String, Session> SESSIONS = new ConcurrentHashMap<>();
    public static class TvConfigurator extends ServerEndpointConfig.Configurator {
        @Override
        public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
            String ip = extractIp(request);
            sec.getUserProperties().put("ip", ip);
            log.info("电视机WebSocket握手完成, requestUri: {}, clientIp: {}", request.getRequestURI(), ip);
        }
        private String extractIp(HandshakeRequest request) {
            String headerIp = extractIpFromHeaders(request.getHeaders());
            if (isValidIp(headerIp)) {
                return headerIp;
            }
            String tomcatRemoteIp = extractIpFromTomcatRequest(request);
            if (isValidIp(tomcatRemoteIp)) {
                return tomcatRemoteIp;
            }
            log.warn("电视机WebSocket握手未获取到客户端IP, headers: {}", request.getHeaders().keySet());
            return "unknown";
        }
        private String extractIpFromHeaders(Map<String, List<String>> headers) {
            if (headers == null || headers.isEmpty()) {
                return null;
            }
            String[] headerNames = {
                    "X-Forwarded-For",
                    "X-Real-IP",
                    "Proxy-Client-IP",
                    "WL-Proxy-Client-IP",
                    "HTTP_X_FORWARDED_FOR",
                    "HTTP_X_REAL_IP"
            };
            for (String headerName : headerNames) {
                String ip = firstHeaderValue(headers, headerName);
                if (isValidIp(ip)) {
                    return normalizeIp(ip.split(",")[0].trim());
                }
            }
            String remoteAddress = firstHeaderValue(headers, "remoteAddress");
            if (isValidIp(remoteAddress)) {
                return normalizeIp(remoteAddress);
            }
            return null;
        }
        private String extractIpFromTomcatRequest(HandshakeRequest request) {
            if (!(request instanceof WsHandshakeRequest wsHandshakeRequest)) {
                return null;
            }
            try {
                Field requestField = WsHandshakeRequest.class.getDeclaredField("request");
                requestField.setAccessible(true);
                HttpServletRequest httpServletRequest = (HttpServletRequest) requestField.get(wsHandshakeRequest);
                if (httpServletRequest == null) {
                    return null;
                }
                return normalizeIp(httpServletRequest.getRemoteAddr());
            } catch (Exception e) {
                log.warn("电视机WebSocket从Tomcat握手请求中提取IP失败: {}", e.getMessage());
                return null;
            }
        }
        private String firstHeaderValue(Map<String, List<String>> headers, String headerName) {
            List<String> values = headers.get(headerName);
            if (values == null || values.isEmpty()) {
                return null;
            }
            return values.get(0);
        }
        private boolean isValidIp(String ip) {
            return ip != null && !ip.isEmpty() && !"unknown".equalsIgnoreCase(ip);
        }
        private String normalizeIp(String ip) {
            if (ip == null) {
                return null;
            }
            String normalized = ip.trim();
            if (normalized.startsWith("/")) {
                normalized = normalized.substring(1);
            }
            if (normalized.startsWith("::ffff:")) {
                normalized = normalized.substring(7);
            }
            if (normalized.startsWith("[") && normalized.contains("]")) {
                normalized = normalized.substring(1, normalized.indexOf(']'));
            } else if (normalized.chars().filter(ch -> ch == ':').count() == 1) {
                int colonIdx = normalized.lastIndexOf(':');
                if (colonIdx > 0) {
                    normalized = normalized.substring(0, colonIdx);
                }
            }
            return normalized;
        }
    }
    @OnOpen
    public void onOpen(Session session) {
        String ip = getIp(session);
        SESSIONS.put(ip, session);
        log.info("电视机WebSocket连接建立, IP: {}, 当前在线数: {}", ip, SESSIONS.size());
    }
    @OnClose
    public void onClose(Session session) {
        String ip = getIp(session);
        SESSIONS.remove(ip);
        log.info("电视机WebSocket连接关闭, IP: {}, 当前在线数: {}", ip, SESSIONS.size());
    }
    @OnError
    public void onError(Session session, Throwable error) {
        String ip = getIp(session);
        SESSIONS.remove(ip);
        log.warn("电视机WebSocket传输异常, IP: {}, error: {}", ip, error.getMessage());
    }
    @OnMessage
    public void onMessage(String message, Session session) {
        // 电视机端无需发送消息,忽略
    }
    public void sendMessageToDevice(String ip, String message) {
        Session session = SESSIONS.get(ip);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("推送消息到设备 {} 失败: {}", ip, e.getMessage());
            }
        }
    }
    public void sendMessageToAll(String message) {
        for (Map.Entry<String, Session> entry : SESSIONS.entrySet()) {
            Session session = entry.getValue();
            if (session.isOpen()) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("广播消息到设备 {} 失败: {}", entry.getKey(), e.getMessage());
                }
            }
        }
    }
    public Set<String> getOnlineIps() {
        return SESSIONS.keySet();
    }
    private String getIp(Session session) {
        Object ip = session.getUserProperties().get("ip");
        return ip != null ? ip.toString() : "unknown";
    }
}
src/main/java/com/zy/common/config/AdminInterceptor.java
@@ -45,6 +45,9 @@
        if (handler instanceof org.springframework.web.servlet.resource.ResourceHttpRequestHandler) {
            return true;
        }
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }
        // super账号
        String token = request.getHeader("token");
        if (token!=null) {
src/main/java/com/zy/common/config/WebSocketConfig.java
@@ -21,4 +21,3 @@
        registry.addEndpoint("/api/socket");
    }
}
src/main/webapp/views/wsTest/wsTest.html
New file
@@ -0,0 +1,319 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="utf-8">
    <title>WebSocket调试</title>
    <meta name="renderer" content="webkit">
    <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
    <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1">
    <link rel="stylesheet" href="../../static/vue/element/element.css">
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }
        body {
            font-family: 'Helvetica Neue', Helvetica, 'PingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei', Arial, sans-serif;
            background: #f5f7fa;
            padding: 15px;
        }
        .app-container {
            background: #fff;
            border-radius: 8px;
            box-shadow: 0 2px 12px 0 rgba(0, 0, 0, 0.1);
            padding: 20px;
        }
        .toolbar {
            display: flex;
            align-items: center;
            gap: 10px;
            margin-bottom: 15px;
            flex-wrap: wrap;
        }
        .toolbar .el-input {
            width: 400px;
        }
        .status-dot {
            display: inline-block;
            width: 10px;
            height: 10px;
            border-radius: 50%;
            margin-right: 5px;
        }
        .status-dot.connected {
            background: #67c23a;
        }
        .status-dot.disconnected {
            background: #909399;
        }
        .log-panel {
            border: 1px solid #dcdfe6;
            border-radius: 4px;
            height: calc(100vh - 220px);
            overflow-y: auto;
            padding: 10px;
            background: #1e1e1e;
            color: #d4d4d4;
            font-family: Consolas, Monaco, 'Courier New', monospace;
            font-size: 13px;
            line-height: 1.6;
        }
        .log-item {
            margin-bottom: 4px;
            word-break: break-all;
            white-space: pre-wrap;
        }
        .log-item .time {
            color: #6a9955;
            margin-right: 8px;
        }
        .log-item.system {
            color: #569cd6;
        }
        .log-item.error {
            color: #f44747;
        }
        .log-item.receive {
            color: #dcdcaa;
        }
        .log-item .tag {
            display: inline-block;
            padding: 0 6px;
            border-radius: 3px;
            font-size: 12px;
            margin-right: 6px;
        }
        .tag-connect {
            background: #67c23a;
            color: #fff;
        }
        .tag-close {
            background: #909399;
            color: #fff;
        }
        .tag-error {
            background: #f56c6c;
            color: #fff;
        }
        .tag-receive {
            background: #e6a23c;
            color: #fff;
        }
        .filter-bar {
            display: flex;
            align-items: center;
            gap: 10px;
            margin-bottom: 10px;
        }
        .stats {
            color: #909399;
            font-size: 13px;
            margin-left: auto;
        }
    </style>
</head>
<body>
    <div id="app" class="app-container">
        <div class="toolbar">
            <el-input v-model="wsUrl" placeholder="WebSocket地址" size="small" :disabled="connected">
                <template slot="prepend">WS</template>
            </el-input>
            <el-button :type="connected ? 'danger' : 'primary'" size="small" @click="toggleConnection">
                {{ connected ? '断开连接' : '连接' }}
            </el-button>
            <el-button size="small" @click="clearLogs">清空日志</el-button>
            <el-button size="small" @click="autoScroll = !autoScroll">
                {{ autoScroll ? '自动滚动:开' : '自动滚动:关' }}
            </el-button>
            <span>
                <span class="status-dot" :class="connected ? 'connected' : 'disconnected'"></span>
                {{ connected ? '已连接' : '未连接' }}
            </span>
        </div>
        <div class="filter-bar">
            <el-checkbox-group v-model="filters" size="small">
                <el-checkbox-button label="system">系统</el-checkbox-button>
                <el-checkbox-button label="receive">接收</el-checkbox-button>
                <el-checkbox-button label="error">错误</el-checkbox-button>
            </el-checkbox-group>
            <el-input v-model="searchKeyword" placeholder="搜索消息内容" size="small" style="width: 250px;"
                clearable></el-input>
            <span class="stats">共 {{ filteredLogs.length }} 条</span>
        </div>
        <div class="log-panel" ref="logPanel">
            <div v-for="(log, index) in filteredLogs" :key="index" class="log-item" :class="log.type">
                <span class="time">{{ log.time }}</span>
                <span v-if="log.type === 'system'" class="tag tag-connect">系统</span>
                <span v-else-if="log.type === 'error'" class="tag tag-error">错误</span>
                <span v-else-if="log.type === 'receive'" class="tag tag-receive">{{ log.url || '接收' }}</span>
                <span>{{ log.content }}</span>
            </div>
            <div v-if="filteredLogs.length === 0" style="color: #606266; text-align: center; padding: 40px;">
                点击"连接"开始接收WebSocket消息
            </div>
        </div>
    </div>
    <script src="../../static/vue/js/vue.min.js"></script>
    <script src="../../static/vue/element/element.js"></script>
    <script>
        new Vue({
            el: '#app',
            data: {
                wsUrl: '',
                ws: null,
                connected: false,
                autoScroll: true,
                logs: [],
                filters: ['system', 'receive', 'error'],
                searchKeyword: '',
                msgCount: {}
            },
            created: function () {
                this.wsUrl = this.getDefaultUrl();
            },
            computed: {
                filteredLogs: function () {
                    var self = this;
                    return this.logs.filter(function (log) {
                        var typeMatch = self.filters.indexOf(log.type) >= 0;
                        var searchMatch = !self.searchKeyword ||
                            log.content.toLowerCase().indexOf(self.searchKeyword.toLowerCase()) >= 0 ||
                            (log.url && log.url.toLowerCase().indexOf(self.searchKeyword.toLowerCase()) >= 0);
                        return typeMatch && searchMatch;
                    });
                }
            },
            methods: {
                getDefaultUrl: function () {
                    var protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
                    return protocol + '//' + window.location.host + '/monitor/tv/socket';
                },
                toggleConnection: function () {
                    if (this.connected) {
                        this.disconnect();
                    } else {
                        this.connect();
                    }
                },
                connect: function () {
                    var self = this;
                    try {
                        this.ws = new WebSocket(this.wsUrl);
                    } catch (e) {
                        this.addLog('error', '连接失败: ' + e.message);
                        return;
                    }
                    this.ws.onopen = function () {
                        self.connected = true;
                        self.addLog('system', '连接已建立');
                    };
                    this.ws.onmessage = function (event) {
                        var url = '';
                        var content = event.data;
                        try {
                            var msg = JSON.parse(event.data);
                            url = msg.url || '';
                            if (msg.data) {
                                try {
                                    var parsed = JSON.parse(msg.data);
                                    content = JSON.stringify(parsed, null, 2);
                                } catch (e) {
                                    content = msg.data;
                                }
                            }
                        } catch (e) {
                            // 非JSON消息,直接显示
                        }
                        if (url) {
                            self.msgCount[url] = (self.msgCount[url] || 0) + 1;
                        }
                        self.addLog('receive', content, url);
                    };
                    this.ws.onclose = function () {
                        self.connected = false;
                        self.addLog('system', '连接已关闭');
                    };
                    this.ws.onerror = function () {
                        self.addLog('error', '连接发生错误');
                    };
                },
                disconnect: function () {
                    if (this.ws) {
                        this.ws.close();
                        this.ws = null;
                    }
                },
                addLog: function (type, content, url) {
                    var now = new Date();
                    var time = [
                        String(now.getHours()).padStart(2, '0'),
                        String(now.getMinutes()).padStart(2, '0'),
                        String(now.getSeconds()).padStart(2, '0')
                    ].join(':') + '.' + String(now.getMilliseconds()).padStart(3, '0');
                    this.logs.push({
                        time: time,
                        type: type,
                        content: content,
                        url: url
                    });
                    // 限制最大日志条数
                    if (this.logs.length > 2000) {
                        this.logs.splice(0, this.logs.length - 1500);
                    }
                    if (this.autoScroll) {
                        this.$nextTick(function () {
                            var panel = this.$refs.logPanel;
                            if (panel) {
                                panel.scrollTop = panel.scrollHeight;
                            }
                        });
                    }
                },
                clearLogs: function () {
                    this.logs = [];
                    this.msgCount = {};
                }
            },
            beforeDestroy: function () {
                this.disconnect();
            }
        });
    </script>
</body>
</html>