|  |  | 
 |  |  | import com.zy.common.utils.NavigatePositionConvert; | 
 |  |  | import com.zy.common.utils.RedisUtil; | 
 |  |  | import com.zy.core.News; | 
 |  |  | import com.zy.core.Utils.DeviceMsgUtils; | 
 |  |  | import com.zy.core.action.ShuttleAction; | 
 |  |  | import com.zy.core.cache.OutputQueue; | 
 |  |  | import com.zy.core.enums.*; | 
 |  |  | import com.zy.core.model.CommandResponse; | 
 |  |  | import com.zy.core.model.DeviceMsgModel; | 
 |  |  | import com.zy.core.model.ShuttleSlave; | 
 |  |  | import com.zy.core.model.command.NyShuttleHttpCommand; | 
 |  |  | import com.zy.core.model.command.ShuttleCommand; | 
 |  |  | 
 |  |  | import lombok.Data; | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  |  | 
 |  |  | import java.io.BufferedReader; | 
 |  |  | import java.io.IOException; | 
 |  |  | import java.io.InputStreamReader; | 
 |  |  | import java.io.OutputStreamWriter; | 
 |  |  | import java.net.InetAddress; | 
 |  |  | import java.net.Socket; | 
 |  |  | import java.text.MessageFormat; | 
 |  |  | import java.util.*; | 
 |  |  |  | 
 |  |  | 
 |  |  |     private ShuttleSlave slave; | 
 |  |  |     private RedisUtil redisUtil; | 
 |  |  |     private ShuttleProtocol shuttleProtocol; | 
 |  |  |     private Socket socket; | 
 |  |  |  | 
 |  |  |     private static final boolean DEBUG = false;//调试模式 | 
 |  |  |  | 
 |  |  | 
 |  |  |     @Override | 
 |  |  |     public void run() { | 
 |  |  |         News.info("{}号四向车线程启动", slave.getId()); | 
 |  |  |         this.connect(); | 
 |  |  |  | 
 |  |  |         //监听消息并存储 | 
 |  |  |         Thread innerThread = new Thread(() -> { | 
 |  |  |             while (true) { | 
 |  |  |                 try { | 
 |  |  |                     listenSocketMessage(); | 
 |  |  |                     listenInit();//监听初始化事件 | 
 |  |  |                 } catch (Exception e) { | 
 |  |  |                     e.printStackTrace(); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |         }); | 
 |  |  |         innerThread.start(); | 
 |  |  |  | 
 |  |  |         //设备读取 | 
 |  |  |         Thread readThread = new Thread(() -> { | 
 |  |  |             while (true) { | 
 |  |  |                 try { | 
 |  |  |                     listenMessageFromRedis(); | 
 |  |  |                     listenInit();//监听初始化事件 | 
 |  |  |                     read(); | 
 |  |  |                     Thread.sleep(50); | 
 |  |  |                 } catch (Exception e) { | 
 |  |  | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     private void listenSocketMessage() { | 
 |  |  |     private void listenMessageFromRedis() { | 
 |  |  |         try { | 
 |  |  |             if (this.socket == null) { | 
 |  |  |             DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class); | 
 |  |  |             if (deviceMsgUtils == null) { | 
 |  |  |                 return; | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             // 获取输入流 | 
 |  |  |             BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); | 
 |  |  |             // 读取服务器的响应 | 
 |  |  |             StringBuffer sb = new StringBuffer(); | 
 |  |  |             char[] chars = new char[2048];//缓冲区 | 
 |  |  |             while (true) { | 
 |  |  |                 reader.read(chars); | 
 |  |  |                 String trim = new String(chars); | 
 |  |  |                 sb.append(trim); | 
 |  |  |                 if (trim.lastIndexOf("\r\n") != -1) { | 
 |  |  |                     break; | 
 |  |  |                 } | 
 |  |  |             DeviceMsgModel deviceMsg = deviceMsgUtils.getDeviceMsg(SlaveType.Shuttle, slave.getId()); | 
 |  |  |             if(deviceMsg == null){ | 
 |  |  |                 return; | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集 | 
 |  |  |             JSONObject result = JSON.parseObject(deviceMsg.getDeviceMsg().toString());//得到响应结果集 | 
 |  |  |  | 
 |  |  |             String msgType = result.getString("msgType"); | 
 |  |  |             if ("responseMsg".equals(msgType)) { | 
 |  |  | 
 |  |  |  | 
 |  |  |     private void read() { | 
 |  |  |         try { | 
 |  |  |             if (this.socket == null || this.socket.isClosed()) { | 
 |  |  |                 //链接断开重新链接 | 
 |  |  |                 this.connect(); | 
 |  |  |             } | 
 |  |  |             readStatus(); | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             e.printStackTrace(); | 
 |  |  | 
 |  |  |                 InnerSuhttleExtend extend = new InnerSuhttleExtend(); | 
 |  |  |                 shuttleProtocol.setExtend(extend); | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             //----------读取四向穿梭车状态----------- | 
 |  |  |             NyShuttleHttpCommand readStatusCommand = getReadStatusCommand(slave.getId()); | 
 |  |  |             requestCommandAsync(readStatusCommand);//请求状态 | 
 |  |  |  | 
 |  |  |             if (this.socketReadResults.isEmpty()) { | 
 |  |  |                 if (System.currentTimeMillis() - shuttleProtocol.getLastOnlineTime() > 1000 * 60) { | 
 |  |  | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             e.printStackTrace(); | 
 |  |  |             OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket状态信息失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); | 
 |  |  |             try { | 
 |  |  |                 this.socket.close(); | 
 |  |  |                 this.socket = null; | 
 |  |  |                 Thread.sleep(1000); | 
 |  |  |                 this.connect(); | 
 |  |  |             } catch (IOException | InterruptedException exception) { | 
 |  |  |                 e.printStackTrace(); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public boolean connect() { | 
 |  |  |         try { | 
 |  |  |             InetAddress address = InetAddress.getByName(slave.getIp()); | 
 |  |  |             if (address.isReachable(10000)) { | 
 |  |  |                 Socket socket = new Socket(slave.getIp(), slave.getPort()); | 
 |  |  |                 socket.setSoTimeout(10000); | 
 |  |  |                 socket.setKeepAlive(true); | 
 |  |  |                 this.socket = socket; | 
 |  |  |                 log.info(MessageFormat.format("【{0}】四向穿梭车Socket链接成功 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); | 
 |  |  |             } | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket链接失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); | 
 |  |  |             return false; | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         return true; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |     //发出请求 | 
 |  |  |     private JSONObject requestCommand(NyShuttleHttpCommand httpCommand) throws IOException { | 
 |  |  |         try { | 
 |  |  |             if (this.socket == null) { | 
 |  |  |             DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class); | 
 |  |  |             if (deviceMsgUtils == null) { | 
 |  |  |                 return null; | 
 |  |  |             } | 
 |  |  |  | 
 |  |  | 
 |  |  |             JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand)); | 
 |  |  |             data.remove("nodes"); | 
 |  |  |  | 
 |  |  |             // 获取输出流 | 
 |  |  |             OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream()); | 
 |  |  |             writer.write(JSON.toJSONString(data) + "\r\n"); | 
 |  |  |             writer.flush(); | 
 |  |  | //            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand)); | 
 |  |  |             String key = deviceMsgUtils.sendCommand(SlaveType.Shuttle, slave.getId(), data); | 
 |  |  |  | 
 |  |  |             String requestType = null; | 
 |  |  |             String taskId = null; | 
 |  |  | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             // 获取服务器响应 | 
 |  |  |             // 尝试10次 | 
 |  |  |             // 尝试30次 | 
 |  |  |             JSONObject result = null; | 
 |  |  |             for (int i = 0; i < 10; i++) { | 
 |  |  |             for (int i = 0; i < 30; i++) { | 
 |  |  |                 result = getRequestBody(requestType, taskId); | 
 |  |  |                 if (result == null) { | 
 |  |  |                     try { | 
 |  |  | 
 |  |  |             e.printStackTrace(); | 
 |  |  |         } | 
 |  |  |         return null; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     private void requestCommandAsync(NyShuttleHttpCommand httpCommand) throws IOException { | 
 |  |  |         if (this.socket == null) { | 
 |  |  |             return; | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |        try { | 
 |  |  |            //压缩数据包 | 
 |  |  |            JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand)); | 
 |  |  |            data.remove("nodes"); | 
 |  |  |  | 
 |  |  |            // 获取输出流 | 
 |  |  |            OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream()); | 
 |  |  |            writer.write(JSON.toJSONString(data) + "\r\n"); | 
 |  |  |            writer.flush(); | 
 |  |  | //            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand)); | 
 |  |  |        }catch (Exception e) { | 
 |  |  | //           e.printStackTrace(); | 
 |  |  | //           System.out.println("socket write error"); | 
 |  |  |            this.socket.close(); | 
 |  |  |            this.socket = null; | 
 |  |  |        } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     private JSONObject filterBodyData(JSONObject data) { |