From 84758497ae449590f994c885421b2c32917d4548 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期四, 03 七月 2025 15:10:19 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/thread/impl/NyShuttleThread.java |  113 ++++++++------------------------------------------------
 1 files changed, 16 insertions(+), 97 deletions(-)

diff --git a/src/main/java/com/zy/core/thread/impl/NyShuttleThread.java b/src/main/java/com/zy/core/thread/impl/NyShuttleThread.java
index 33142bc..e63336a 100644
--- a/src/main/java/com/zy/core/thread/impl/NyShuttleThread.java
+++ b/src/main/java/com/zy/core/thread/impl/NyShuttleThread.java
@@ -22,10 +22,12 @@
 import com.zy.common.utils.NavigatePositionConvert;
 import com.zy.common.utils.RedisUtil;
 import com.zy.core.News;
+import com.zy.core.Utils.DeviceMsgUtils;
 import com.zy.core.action.ShuttleAction;
 import com.zy.core.cache.OutputQueue;
 import com.zy.core.enums.*;
 import com.zy.core.model.CommandResponse;
+import com.zy.core.model.DeviceMsgModel;
 import com.zy.core.model.ShuttleSlave;
 import com.zy.core.model.command.NyShuttleHttpCommand;
 import com.zy.core.model.command.ShuttleCommand;
@@ -38,12 +40,8 @@
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.Socket;
 import java.text.MessageFormat;
 import java.util.*;
 
@@ -54,7 +52,6 @@
     private ShuttleSlave slave;
     private RedisUtil redisUtil;
     private ShuttleProtocol shuttleProtocol;
-    private Socket socket;
 
     private static final boolean DEBUG = false;//璋冭瘯妯″紡
 
@@ -72,25 +69,13 @@
     @Override
     public void run() {
         News.info("{}鍙峰洓鍚戣溅绾跨▼鍚姩", slave.getId());
-        this.connect();
-
-        //鐩戝惉娑堟伅骞跺瓨鍌�
-        Thread innerThread = new Thread(() -> {
-            while (true) {
-                try {
-                    listenSocketMessage();
-                    listenInit();//鐩戝惉鍒濆鍖栦簨浠�
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-        innerThread.start();
 
         //璁惧璇诲彇
         Thread readThread = new Thread(() -> {
             while (true) {
                 try {
+                    listenMessageFromRedis();
+                    listenInit();//鐩戝惉鍒濆鍖栦簨浠�
                     read();
                     Thread.sleep(50);
                 } catch (Exception e) {
@@ -175,27 +160,17 @@
         }
     }
 
-    private void listenSocketMessage() {
+    private void listenMessageFromRedis() {
         try {
-            if (this.socket == null) {
+            DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class);
+            if (deviceMsgUtils == null) {
                 return;
             }
-
-            // 鑾峰彇杈撳叆娴�
-            BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
-            // 璇诲彇鏈嶅姟鍣ㄧ殑鍝嶅簲
-            StringBuffer sb = new StringBuffer();
-            char[] chars = new char[2048];//缂撳啿鍖�
-            while (true) {
-                reader.read(chars);
-                String trim = new String(chars);
-                sb.append(trim);
-                if (trim.lastIndexOf("\r\n") != -1) {
-                    break;
-                }
+            DeviceMsgModel deviceMsg = deviceMsgUtils.getDeviceMsg(SlaveType.Shuttle, slave.getId());
+            if(deviceMsg == null){
+                return;
             }
-
-            JSONObject result = JSON.parseObject(sb.toString());//寰楀埌鍝嶅簲缁撴灉闆�
+            JSONObject result = JSON.parseObject(deviceMsg.getDeviceMsg().toString());//寰楀埌鍝嶅簲缁撴灉闆�
 
             String msgType = result.getString("msgType");
             if ("responseMsg".equals(msgType)) {
@@ -266,10 +241,6 @@
 
     private void read() {
         try {
-            if (this.socket == null || this.socket.isClosed()) {
-                //閾炬帴鏂紑閲嶆柊閾炬帴
-                this.connect();
-            }
             readStatus();
         } catch (Exception e) {
             e.printStackTrace();
@@ -287,10 +258,6 @@
                 InnerSuhttleExtend extend = new InnerSuhttleExtend();
                 shuttleProtocol.setExtend(extend);
             }
-
-            //----------璇诲彇鍥涘悜绌挎杞︾姸鎬�-----------
-            NyShuttleHttpCommand readStatusCommand = getReadStatusCommand(slave.getId());
-            requestCommandAsync(readStatusCommand);//璇锋眰鐘舵��
 
             if (this.socketReadResults.isEmpty()) {
                 if (System.currentTimeMillis() - shuttleProtocol.getLastOnlineTime() > 1000 * 60) {
@@ -378,14 +345,6 @@
         } catch (Exception e) {
             e.printStackTrace();
             OutputQueue.SHUTTLE.offer(MessageFormat.format("銆恵0}銆戝洓鍚戠┛姊溅Socket鐘舵�佷俊鎭け璐� ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort()));
-            try {
-                this.socket.close();
-                this.socket = null;
-                Thread.sleep(1000);
-                this.connect();
-            } catch (IOException | InterruptedException exception) {
-                e.printStackTrace();
-            }
         }
     }
 
@@ -1067,20 +1026,6 @@
 
     @Override
     public boolean connect() {
-        try {
-            InetAddress address = InetAddress.getByName(slave.getIp());
-            if (address.isReachable(10000)) {
-                Socket socket = new Socket(slave.getIp(), slave.getPort());
-                socket.setSoTimeout(10000);
-                socket.setKeepAlive(true);
-                this.socket = socket;
-                log.info(MessageFormat.format("銆恵0}銆戝洓鍚戠┛姊溅Socket閾炬帴鎴愬姛 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort()));
-            }
-        } catch (Exception e) {
-            OutputQueue.SHUTTLE.offer(MessageFormat.format("銆恵0}銆戝洓鍚戠┛姊溅Socket閾炬帴澶辫触 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort()));
-            return false;
-        }
-
         return true;
     }
 
@@ -1136,7 +1081,8 @@
     //鍙戝嚭璇锋眰
     private JSONObject requestCommand(NyShuttleHttpCommand httpCommand) throws IOException {
         try {
-            if (this.socket == null) {
+            DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class);
+            if (deviceMsgUtils == null) {
                 return null;
             }
 
@@ -1144,11 +1090,7 @@
             JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand));
             data.remove("nodes");
 
-            // 鑾峰彇杈撳嚭娴�
-            OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream());
-            writer.write(JSON.toJSONString(data) + "\r\n");
-            writer.flush();
-//            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand));
+            String key = deviceMsgUtils.sendCommand(SlaveType.Shuttle, slave.getId(), data);
 
             String requestType = null;
             String taskId = null;
@@ -1161,9 +1103,9 @@
             }
 
             // 鑾峰彇鏈嶅姟鍣ㄥ搷搴�
-            // 灏濊瘯10娆�
+            // 灏濊瘯30娆�
             JSONObject result = null;
-            for (int i = 0; i < 10; i++) {
+            for (int i = 0; i < 30; i++) {
                 result = getRequestBody(requestType, taskId);
                 if (result == null) {
                     try {
@@ -1180,29 +1122,6 @@
             e.printStackTrace();
         }
         return null;
-    }
-
-    private void requestCommandAsync(NyShuttleHttpCommand httpCommand) throws IOException {
-        if (this.socket == null) {
-            return;
-        }
-
-       try {
-           //鍘嬬缉鏁版嵁鍖�
-           JSONObject data = JSON.parseObject(JSON.toJSONString(httpCommand));
-           data.remove("nodes");
-
-           // 鑾峰彇杈撳嚭娴�
-           OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream());
-           writer.write(JSON.toJSONString(data) + "\r\n");
-           writer.flush();
-//            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand));
-       }catch (Exception e) {
-//           e.printStackTrace();
-//           System.out.println("socket write error");
-           this.socket.close();
-           this.socket = null;
-       }
     }
 
     private JSONObject filterBodyData(JSONObject data) {

--
Gitblit v1.9.1