package com.zy.core.thread.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.core.common.DateUtils; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.cache.OutputQueue; import com.zy.core.model.ShuttleSlave; import com.zy.core.thread.ShuttleThread; import lombok.extern.slf4j.Slf4j; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.Socket; import java.text.MessageFormat; import java.util.*; @Slf4j @SuppressWarnings("all") public class NyShuttleThread implements ShuttleThread { private ShuttleSlave slave; private RedisUtil redisUtil; private Socket socket; private static final boolean DEBUG = false;//调试模式 private List socketReadResults = new ArrayList<>(); private List socketResults = new ArrayList<>(); public NyShuttleThread(ShuttleSlave slave, RedisUtil redisUtil) { this.slave = slave; this.redisUtil = redisUtil; } @Override public void run() { News.info("{}号四向车线程启动", slave.getId()); this.connect(); //监听消息并存储 Thread innerThread = new Thread(() -> { while (true) { try { listenSocketMessage(); } catch (Exception e) { e.printStackTrace(); } } }); innerThread.start(); // //设备执行 // Thread executeThread = new Thread(() -> { // while (true) { // try { // ShuttleAction shuttleAction = SpringUtils.getBean(ShuttleAction.class); // if (shuttleAction == null) { // continue; // } // // Object object = redisUtil.get(RedisKeyType.SHUTTLE_FLAG.key + slave.getId()); // if (object == null) { // continue; // } // // Integer taskNo = Integer.valueOf(String.valueOf(object)); // if (taskNo != 0) { // //存在任务需要执行 // boolean result = shuttleAction.executeWork(slave.getId(), taskNo); // } // //// //小车空闲且有跑库程序 //// shuttleAction.moveLoc(slave.getId()); // // //演示模式 // shuttleAction.demo(slave.getId()); // // Thread.sleep(200); // } catch (Exception e) { // e.printStackTrace(); // } // } // }); // executeThread.start(); } private void listenSocketMessage() { try { if (this.socket == null) { return; } // 获取输入流 BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); // 读取服务器的响应 StringBuffer sb = new StringBuffer(); char[] chars = new char[2048];//缓冲区 while (true) { reader.read(chars); String trim = new String(chars); sb.append(trim); if (trim.lastIndexOf("\r\n") != -1) { break; } } JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集 String msgType = result.getString("msgType"); if ("responseMsg".equals(msgType)) { JSONObject response = result.getJSONObject("response"); JSONObject body = response.getJSONObject("body"); if (body.containsKey("workingMode")) { //read socketReadResults.add(body); return; } } if (!socketResults.isEmpty() && socketResults.size() >= 20) { socketResults.remove(0);//清理头节点 } socketResults.add(result);//添加数据 } catch (Exception e) { // e.printStackTrace(); } } // public JSONObject getRequestBody(String type, String taskId) { // try { // // 获取服务器响应 // JSONObject result = null; // if (type.equals("readState")) { // type = "state"; // } // // for (int i = 0; i < socketResults.size(); i++) { // JSONObject socketResult = socketResults.get(i); // if (!socketResult.get("msgType").equals("responseMsg")) {//不是响应内容 // continue; // } // // JSONObject resultResponse = JSON.parseObject(socketResult.get("response").toString()); // JSONObject resultBody = JSON.parseObject(resultResponse.get("body").toString()); // String responseType = resultBody.get("responseType").toString(); // if (DEBUG) { // result = socketResult; // break; // } // // if (!responseType.equals(type)) { // continue;//响应类型与请求类型不一致,不在调试模式下 // } // // if (taskId != null) { // String responseTaskId = resultBody.get("taskId").toString(); // if (!responseTaskId.equals(taskId)) { // continue;//响应ID与请求ID不一致,不在调试模式下 // } // } // // result = socketResult; // break; // } // // if (result == null) { // return null;//无响应结果 // } // // return filterBodyData(result);//返回Body结果集 // } catch (Exception e) { // return null; // } // } @Override public boolean connect() { try { InetAddress address = InetAddress.getByName(slave.getIp()); if (address.isReachable(10000)) { Socket socket = new Socket(slave.getIp(), slave.getPort()); socket.setSoTimeout(10000); socket.setKeepAlive(true); this.socket = socket; log.info(MessageFormat.format("【{0}】四向穿梭车Socket链接成功 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); } } catch (Exception e) { OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket链接失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); return false; } return true; } @Override public void close() { } // //发出请求 // private JSONObject requestCommand(NyShuttleHttpCommand httpCommand) throws IOException { // try { // if (this.socket == null) { // return null; // } // // //压缩数据包 // JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand)); // data.remove("nodes"); // // // 获取输出流 // OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream()); // writer.write(JSON.toJSONString(data) + "\r\n"); // writer.flush(); //// System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand)); // // String requestType = null; // String taskId = null; // try { // requestType = httpCommand.getRequest().getBody().get("requestType").toString(); // taskId = httpCommand.getRequest().getBody().get("taskId").toString(); // } catch (Exception e) { //// return null; // //taskId可能取空,不报错,正常情况 // } // // // 获取服务器响应 // // 尝试10次 // JSONObject result = null; // for (int i = 0; i < 10; i++) { // result = getRequestBody(requestType, taskId); // if (result == null) { // try { // Thread.sleep(100); // } catch (Exception e) { // e.printStackTrace(); // } // }else { // break; // } // } // return result;//返回Body结果集 // }catch (Exception e) { // e.printStackTrace(); // } // return null; // } // // private void requestCommandAsync(NyShuttleHttpCommand httpCommand) throws IOException { // if (this.socket == null) { // return; // } // // try { // //压缩数据包 // JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand)); // data.remove("nodes"); // // // 获取输出流 // OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream()); // writer.write(JSON.toJSONString(data) + "\r\n"); // writer.flush(); //// System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand)); // }catch (Exception e) { //// e.printStackTrace(); //// System.out.println("socket write error"); // this.socket.close(); // this.socket = null; // } // } // // private JSONObject filterBodyData(JSONObject data) { // Object response = data.get("response"); // if (response == null) { // return null; // } // // JSONObject result = JSON.parseObject(response.toString()); // Object body = result.get("body"); // if (body == null) { // return null; // } // JSONObject jsonBody = JSON.parseObject(body.toString()); // return jsonBody; // } }