package com.zy.core.thread.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.core.common.DateUtils; import com.core.common.SpringUtils; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.Utils.DeviceMsgUtils; import com.zy.core.cache.OutputQueue; import com.zy.core.enums.SlaveType; import com.zy.core.model.ShuttleSlave; import com.zy.core.thread.ShuttleThread; import lombok.extern.slf4j.Slf4j; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.Socket; import java.text.MessageFormat; import java.util.*; @Slf4j @SuppressWarnings("all") public class NyShuttleThread implements ShuttleThread { private ShuttleSlave slave; private RedisUtil redisUtil; private Socket socket; private static final boolean DEBUG = false;//调试模式 public NyShuttleThread(ShuttleSlave slave, RedisUtil redisUtil) { this.slave = slave; this.redisUtil = redisUtil; } @Override public void run() { News.info("{}号四向车线程启动", slave.getId()); this.connect(); //监听消息 Thread innerThread = new Thread(() -> { while (true) { try { listenSocketMessage(); } catch (Exception e) { e.printStackTrace(); } } }); innerThread.start(); //执行指令 Thread executeThread = new Thread(() -> { while (true) { try { DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class); Object deviceCommandMsg = deviceMsgUtils.getDeviceCommandMsg(SlaveType.Shuttle, slave.getId()); if (deviceCommandMsg == null) { continue; } executeCommand(deviceCommandMsg); Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } } }); executeThread.start(); } private void executeCommand(Object deviceCommandMsg) { try { if (this.socket == null) { return; } // 获取输出流 OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream()); writer.write(JSON.toJSONString(deviceCommandMsg) + "\r\n"); writer.flush(); // System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand)); }catch (Exception e) { e.printStackTrace(); } } private void listenSocketMessage() { try { if (this.socket == null) { return; } DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class); if(deviceMsgUtils == null) { return; } // 获取输入流 BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); // 读取服务器的响应 StringBuffer sb = new StringBuffer(); char[] chars = new char[2048];//缓冲区 while (true) { reader.read(chars); String trim = new String(chars); sb.append(trim); if (trim.lastIndexOf("\r\n") != -1) { break; } } JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集 deviceMsgUtils.sendDeviceMsg(SlaveType.Shuttle, slave.getId(), result); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean connect() { try { InetAddress address = InetAddress.getByName(slave.getIp()); if (address.isReachable(10000)) { Socket socket = new Socket(slave.getIp(), slave.getPort()); socket.setSoTimeout(10000); socket.setKeepAlive(true); this.socket = socket; log.info(MessageFormat.format("【{0}】四向穿梭车Socket链接成功 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); } } catch (Exception e) { OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket链接失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort())); return false; } return true; } @Override public void close() { } }