#
Junjie
2024-04-28 987d05ba7038d87efe72b4877be911e65f7a7ecb
#
1个文件已修改
179 ■■■■■ 已修改文件
zy-asrs-wcs/src/main/java/com/zy/asrs/wcs/rcs/thread/impl/NyShuttleThread.java 179 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-asrs-wcs/src/main/java/com/zy/asrs/wcs/rcs/thread/impl/NyShuttleThread.java
@@ -56,6 +56,10 @@
    private ShuttleProtocol shuttleProtocol;
    private Socket socket;
    private static final boolean DEBUG = true;//调试模式
    private List<JSONObject> socketResults = new ArrayList<>();
    public NyShuttleThread(Device device, RedisUtil redisUtil) {
        this.device = device;
        this.redisUtil = redisUtil;
@@ -65,6 +69,15 @@
    public void run() {
        News.info("{}号四向车线程启动", device.getDeviceNo());
        this.connect();
        //监听消息并存储
        Thread innerThread = new Thread(() -> {
            while (true) {
                listenSocketMessage();
            }
        });
        innerThread.start();
        while (true) {
            try {
                read();
@@ -75,6 +88,65 @@
        }
    }
    private void listenSocketMessage() {
        try {
            if (this.socket == null) {
                return;
            }
            // 获取输入流
            BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            // 读取服务器的响应
            StringBuffer sb = new StringBuffer();
            String response = null;
            char[] chars = new char[2048];//缓冲区
            do {
                reader.read(chars);
                String trim = new String(chars).trim();
                sb.append(trim);
                if (sb.lastIndexOf("\r\n") != -1) {
                    break;
                }
            } while (response != null);
//            System.out.println("Received response from server: " + sb);
            JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集
            if (!socketResults.isEmpty() && socketResults.size() >= 5) {
                socketResults.remove(0);//清理头节点
            }
            socketResults.add(result);//添加数据
        } catch (Exception e) {
//            e.printStackTrace();
        }
    }
    public JSONObject getRequestBody(Integer requestId) {
        // 获取服务器响应
        JSONObject result = null;
        for (int i = 0; i < socketResults.size(); i++) {
            JSONObject socketResult = socketResults.get(i);
            if (!socketResult.get("msgType").equals("responseMsg")) {//不是响应内容
                continue;
            }
            JSONObject resultResponse = JSON.parseObject(socketResult.get("response").toString());
            JSONObject resultHeader = JSON.parseObject(resultResponse.get("header").toString());
            int responseId = Integer.parseInt(resultHeader.get("responseId").toString());
            if (!DEBUG && responseId != requestId) {
                continue;//响应ID与请求ID不一致,不在调试模式下
            }
            result = socketResult;
            break;
        }
        if (result == null) {
            return null;//无响应结果
        }
        return filterBodyData(result);//返回Body结果集
    }
    private void read() {
        try {
            if (this.socket == null || this.socket.isClosed()) {
@@ -82,7 +154,7 @@
                this.connect();
            }
            readStatus();
            listenInit();
            listenInit();//监听初始化事件
        } catch (Exception e) {
            e.printStackTrace();
            OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】读取四向穿梭车状态信息失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), device.getId(), device.getIp(), device.getPort()));
@@ -112,7 +184,7 @@
                //小车设备状态
                shuttleProtocol.setDeviceStatus(data.getInteger("free"));
                //小车模式
                shuttleProtocol.setDeviceStatus(data.getInteger("workingMode"));
                shuttleProtocol.setMode(data.getInteger("workingMode"));
                //当前二维码
                shuttleProtocol.setCurrentCode(data.getString("coord"));
                //电池电量
@@ -217,42 +289,34 @@
     */
    public void listenInit() {
        try {
            // 获取输入流
            BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            // 获取服务器响应
            JSONObject result = null;
            int removeIdx = -1;
            for (int i = 0; i < socketResults.size(); i++) {
                JSONObject socketResult = socketResults.get(i);
                if (!socketResult.get("msgType").equals("requestMsg")) {//不是请求内容
                    continue;
                }
            // 读取服务器的响应
            StringBuffer sb = new StringBuffer();
            String response = null;
            char[] chars = new char[2048];//缓冲区
            do {
                reader.read(chars);
                String trim = new String(chars).trim();
                sb.append(trim);
                if (sb.lastIndexOf("\r\n") != -1) {
                JSONObject resultResponse = JSON.parseObject(socketResult.get("request").toString());
                JSONObject resultHeader = JSON.parseObject(resultResponse.get("header").toString());
                JSONObject resultBody = JSON.parseObject(resultResponse.get("body").toString());
                String requestType = resultBody.getString("requestType");
                Integer requestId = resultHeader.getInteger("requestId");
                if (requestType.equals("init")) {
                    //小车复位请求
                    ShuttleCommand initCommand = getInitCommand(requestId);
                    //发出请求
                    NyShuttleHttpCommand httpCommand = JSON.parseObject(initCommand.getBody(), NyShuttleHttpCommand.class);
                    JSONObject requestResult = requestCommand(httpCommand);
                    removeIdx = i;//此数据已经处理,从结果集中剔除
                    break;
                }
            } while (response != null);
//            System.out.println("Received response from server: " + sb);
            JSONObject result = JSON.parseObject(sb.toString());//得到请求结果集
            if (!result.get("msgType").equals("requestMsg")) {//不是请求内容
                return;
            }
            JSONObject resultResponse = JSON.parseObject(result.get("request").toString());
            JSONObject resultHeader = JSON.parseObject(resultResponse.get("header").toString());
            JSONObject resultBody = JSON.parseObject(resultResponse.get("body").toString());
            String requestType = resultBody.getString("requestType");
            Integer requestId = resultHeader.getInteger("requestId");
            if (requestType.equals("init")) {
                //小车复位请求
                ShuttleCommand initCommand = getInitCommand(requestId);
                //发出请求
                NyShuttleHttpCommand httpCommand = JSON.parseObject(initCommand.getBody(), NyShuttleHttpCommand.class);
                JSONObject requestResult = requestCommand(httpCommand);
                if (requestResult == null) {
                    return;//请求失败
                }
            if (removeIdx != -1) {
                socketResults.remove(removeIdx);
            }
        } catch (Exception e) {
            e.printStackTrace();
@@ -645,14 +709,9 @@
    public boolean connect() {
        try {
            Socket socket = new Socket(device.getIp(),device.getPort());
            socket.setSoTimeout(60000);
            socket.setSoTimeout(10000);
            socket.setKeepAlive(true);
            this.socket = socket;
            if (null == shuttleProtocol) {
                shuttleProtocol = new ShuttleProtocol();
                shuttleProtocol.setShuttleNo(Integer.parseInt(device.getDeviceNo()));
            }
            shuttleProtocol.setProtocolStatus(ShuttleProtocolStatusType.IDLE);
            log.info(MessageFormat.format("【{0}】四向穿梭车Socket链接成功 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), device.getId(), device.getIp(), device.getPort()));
        } catch (IOException e) {
            OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket链接失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), device.getId(), device.getIp(), device.getPort()));
@@ -719,43 +778,29 @@
        JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand));
        data.remove("nodes");
        // 获取输入流和输出流
        BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
        // 获取输出流
        OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream());
        writer.write(JSON.toJSONString(data) + "\r\n");
        writer.flush();
//            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand));
        // 读取服务器的响应
        StringBuffer sb = new StringBuffer();
        String response = null;
        char[] chars = new char[2048];//缓冲区
        do {
            reader.read(chars);
            String trim = new String(chars).trim();
            sb.append(trim);
            if (sb.lastIndexOf("\r\n") != -1) {
                break;
        // 获取服务器响应
        // 尝试10次
        JSONObject result = null;
        for (int i = 0; i < 10; i++) {
            result = getRequestBody(httpCommand.getRequest().getHeader().getRequestId());
            if (result == null) {
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } while (response != null);
//            System.out.println("Received response from server: " + sb);
        JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集
        if (!result.get("msgType").equals("responseMsg")) {//不是响应内容
            return null;
        }
        JSONObject resultResponse = JSON.parseObject(result.get("response").toString());
        JSONObject resultHeader = JSON.parseObject(resultResponse.get("header").toString());
        int responseId = Integer.parseInt(resultHeader.get("responseId").toString());
        if (responseId != httpCommand.getRequest().getHeader().getRequestId()) {
            return null;//响应ID与请求ID不一致,不在调试模式下
        }
        return filterBodyData(result);//返回Body结果集
        return result;//返回Body结果集
    }
    private static JSONObject filterBodyData(JSONObject data) {
    private JSONObject filterBodyData(JSONObject data) {
        Object response = data.get("response");
        if (response == null) {
            return null;