#
Junjie
2025-07-04 ba7530a23220c82586013aa24c05647ce041fd2a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.zy.core.thread.impl;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.core.common.DateUtils;
import com.core.common.SpringUtils;
import com.zy.common.utils.RedisUtil;
import com.zy.core.News;
import com.zy.core.utils.DeviceMsgUtils;
import com.zy.core.cache.OutputQueue;
import com.zy.core.enums.SlaveType;
import com.zy.core.model.ShuttleSlave;
import com.zy.core.thread.ShuttleThread;
import lombok.extern.slf4j.Slf4j;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.text.MessageFormat;
import java.util.*;
 
@Slf4j
@SuppressWarnings("all")
public class NyShuttleThread implements ShuttleThread {
 
    private ShuttleSlave slave;
    private RedisUtil redisUtil;
    private Socket socket;
 
    private static final boolean DEBUG = false;//调试模式
 
    public NyShuttleThread(ShuttleSlave slave, RedisUtil redisUtil) {
        this.slave = slave;
        this.redisUtil = redisUtil;
    }
 
    @Override
    public void run() {
        News.info("{}号四向车线程启动", slave.getId());
        this.connect();
 
        //监听消息
        Thread innerThread = new Thread(() -> {
            while (true) {
                try {
                    listenSocketMessage();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        innerThread.start();
 
        //执行指令
        Thread executeThread = new Thread(() -> {
            while (true) {
                try {
                    DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class);
                    Object deviceCommandMsg = deviceMsgUtils.getDeviceCommandMsg(SlaveType.Shuttle, slave.getId());
                    if (deviceCommandMsg == null) {
                        continue;
                    }
                    executeCommand(deviceCommandMsg);
 
                    Thread.sleep(200);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executeThread.start();
    }
 
    private void executeCommand(Object deviceCommandMsg) {
        try {
            if (this.socket == null) {
                return;
            }
 
            // 获取输出流
            OutputStreamWriter writer = new OutputStreamWriter(this.socket.getOutputStream());
            writer.write(JSON.toJSONString(deviceCommandMsg) + "\r\n");
            writer.flush();
//            System.out.println("Sent message to server: " + JSON.toJSONString(httpCommand));
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    private void listenSocketMessage() {
        try {
            if (this.socket == null) {
                return;
            }
 
            DeviceMsgUtils deviceMsgUtils = SpringUtils.getBean(DeviceMsgUtils.class);
            if(deviceMsgUtils == null) {
                return;
            }
 
            // 获取输入流
            BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            // 读取服务器的响应
            StringBuffer sb = new StringBuffer();
            char[] chars = new char[2048];//缓冲区
            while (true) {
                reader.read(chars);
                String trim = new String(chars);
                sb.append(trim);
                if (trim.lastIndexOf("\r\n") != -1) {
                    break;
                }
            }
 
            JSONObject result = JSON.parseObject(sb.toString());//得到响应结果集
            deviceMsgUtils.sendDeviceMsg(SlaveType.Shuttle, slave.getId(), result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public boolean connect() {
        try {
            InetAddress address = InetAddress.getByName(slave.getIp());
            if (address.isReachable(10000)) {
                Socket socket = new Socket(slave.getIp(), slave.getPort());
                socket.setSoTimeout(10000);
                socket.setKeepAlive(true);
                this.socket = socket;
                log.info(MessageFormat.format("【{0}】四向穿梭车Socket链接成功 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort()));
            }
        } catch (Exception e) {
            OutputQueue.SHUTTLE.offer(MessageFormat.format("【{0}】四向穿梭车Socket链接失败 ===>> [id:{1}] [ip:{2}] [port:{3}]", DateUtils.convert(new Date()), slave.getId(), slave.getIp(), slave.getPort()));
            return false;
        }
 
        return true;
    }
 
    @Override
    public void close() {
 
    }
}