From 3e3959bf84c673b3cb366e6225a6b5c8dfbea903 Mon Sep 17 00:00:00 2001
From: luxiaotao1123 <t1341870251@163.com>
Date: 星期三, 05 八月 2020 14:09:04 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/channel/TestController.java | 22 +++++
src/main/java/com/zy/core/cache/MessageQueue.java | 96 ++++++++++++++++++++++++
src/main/java/com/zy/core/thread/CrnThread.java | 18 +++
src/main/java/com/zy/core/model/Task.java | 13 +++
src/main/resources/application.yml | 12 +--
src/main/java/com/zy/core/ServerBootstrap.java | 26 ++++++
6 files changed, 176 insertions(+), 11 deletions(-)
diff --git a/src/main/java/com/zy/core/ServerBootstrap.java b/src/main/java/com/zy/core/ServerBootstrap.java
index b168c21..25db737 100644
--- a/src/main/java/com/zy/core/ServerBootstrap.java
+++ b/src/main/java/com/zy/core/ServerBootstrap.java
@@ -1,6 +1,7 @@
package com.zy.core;
import com.core.common.Cools;
+import com.zy.core.cache.MessageQueue;
import com.zy.core.cache.SlaveConnection;
import com.zy.core.enums.SlaveType;
import com.zy.core.properties.SlaveProperties;
@@ -26,10 +27,34 @@
@PostConstruct
public void init(){
+ // 鍒濆鍖栨秷鎭槦鍒�
+ initMq();
// 鍒濆鍖栦笅浣嶆満绾跨▼
initThread();
}
+ private void initMq(){
+ // 鍒濆鍖栧爢鍨涙満mq
+ for (Slave crn : slaveProperties.getCrn()) {
+ MessageQueue.init(SlaveType.Crn, crn);
+ }
+ // 鍒濆鍖栬緭閫佺嚎mq
+ for (Slave devp : slaveProperties.getDevp()) {
+ MessageQueue.init(SlaveType.Devp, devp);
+ }
+ // 鍒濆鍖栨潯鐮佹壂鎻忎华mq
+ for (Slave barcode : slaveProperties.getBarcode()) {
+ MessageQueue.init(SlaveType.Barcode, barcode);
+ }
+ // 鍒濆鍖朙ed鐏痬q
+ for (Slave led : slaveProperties.getLed()) {
+ MessageQueue.init(SlaveType.Led, led);
+ }
+ // 鍒濆鍖栫绉癿q
+ for (Slave scale : slaveProperties.getScale()) {
+ MessageQueue.init(SlaveType.Scale, scale);
+ }
+ }
private void initThread(){
// 鍒濆鍖栧爢鍨涙満绾跨▼
@@ -43,6 +68,7 @@
for (Slave devo : slaveProperties.getDevp()) {
DevpThread devpThread = new DevpThread(devo);
new Thread(devpThread).start();
+ SlaveConnection.put(SlaveType.Devp, devo.getId(), devpThread);
}
}
diff --git a/src/main/java/com/zy/core/cache/MessageQueue.java b/src/main/java/com/zy/core/cache/MessageQueue.java
new file mode 100644
index 0000000..20d588c
--- /dev/null
+++ b/src/main/java/com/zy/core/cache/MessageQueue.java
@@ -0,0 +1,96 @@
+package com.zy.core.cache;
+
+import com.zy.core.Slave;
+import com.zy.core.ThreadHandler;
+import com.zy.core.enums.SlaveType;
+import com.zy.core.model.Task;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 娑堟伅闃熷垪
+ * Created by vincent on 2020/8/5
+ */
+public class MessageQueue {
+
+ // 鍫嗗灈鏈簃q浜ゆ崲鏈�
+ private static final Map<Integer, ConcurrentLinkedQueue<Task>> CRN_EXCHANGE = new ConcurrentHashMap<>();
+ // 杈撻�佺嚎mq浜ゆ崲鏈�
+ private static final Map<Integer, ConcurrentLinkedQueue<Task>> DEVP_EXCHANGE = new ConcurrentHashMap<>();
+ // 鏉$爜鎵弿浠猰q浜ゆ崲鏈�
+ private static final Map<Integer, ConcurrentLinkedQueue<Task>> BARCODE_EXCHANGE = new ConcurrentHashMap<>();
+ // Led鐏� mq浜ゆ崲鏈�
+ private static final Map<Integer, ConcurrentLinkedQueue<Task>> LED_EXCHANGE = new ConcurrentHashMap<>();
+ // 纾呯Оmq浜ゆ崲鏈�
+ private static final Map<Integer, ConcurrentLinkedQueue<Task>> SCALE_EXCHANGE = new ConcurrentHashMap<>();
+
+ /**
+ * mq 浜ゆ崲鏈哄垵濮嬪寲
+ **/
+ public static void init(SlaveType type, Slave slave) {
+ switch (type) {
+ case Crn:
+ CRN_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ break;
+ case Devp:
+ DEVP_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ break;
+ case Barcode:
+ BARCODE_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ break;
+ case Led:
+ LED_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ break;
+ case Scale:
+ SCALE_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * 娣诲姞鍏冪礌
+ * 濡傛灉鍙戠幇闃熷垪宸叉弧鏃犳硶娣诲姞鐨勮瘽锛屼細鐩存帴杩斿洖false銆�
+ */
+ public static boolean offer(SlaveType type, Integer id, Task task) {
+ switch (type) {
+ case Crn:
+ return CRN_EXCHANGE.get(id).offer(task);
+ case Devp:
+ return DEVP_EXCHANGE.get(id).offer(task);
+ case Barcode:
+ return BARCODE_EXCHANGE.get(id).offer(task);
+ case Led:
+ return LED_EXCHANGE.get(id).offer(task);
+ case Scale:
+ return SCALE_EXCHANGE.get(id).offer(task);
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * 绉婚櫎鍏冪礌
+ * 鑻ラ槦鍒椾负绌猴紝杩斿洖null銆�
+ */
+ public static Task poll(SlaveType type, Integer id) {
+ switch (type) {
+ case Crn:
+ return CRN_EXCHANGE.get(id).poll();
+ case Devp:
+ return DEVP_EXCHANGE.get(id).poll();
+ case Barcode:
+ return BARCODE_EXCHANGE.get(id).poll();
+ case Led:
+ return LED_EXCHANGE.get(id).poll();
+ case Scale:
+ return SCALE_EXCHANGE.get(id).poll();
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/src/main/java/com/zy/core/channel/TestController.java b/src/main/java/com/zy/core/channel/TestController.java
new file mode 100644
index 0000000..8dfc07d
--- /dev/null
+++ b/src/main/java/com/zy/core/channel/TestController.java
@@ -0,0 +1,22 @@
+package com.zy.core.channel;
+
+import com.zy.core.cache.MessageQueue;
+import com.zy.core.model.Task;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Created by vincent on 2020/8/5
+ */
+@RestController
+public class TestController {
+
+ @RequestMapping("/test")
+ public String test(){
+ Task task = new Task();
+ task.setId(1);
+ MessageQueue.CRN_QUE.offer(task);
+ return "ok";
+ }
+
+}
diff --git a/src/main/java/com/zy/core/model/Task.java b/src/main/java/com/zy/core/model/Task.java
new file mode 100644
index 0000000..417a0d0
--- /dev/null
+++ b/src/main/java/com/zy/core/model/Task.java
@@ -0,0 +1,13 @@
+package com.zy.core.model;
+
+import lombok.Data;
+
+/**
+ * Created by vincent on 2020/8/5
+ */
+@Data
+public class Task {
+
+ private int id;
+
+}
diff --git a/src/main/java/com/zy/core/thread/CrnThread.java b/src/main/java/com/zy/core/thread/CrnThread.java
index a8f07c3..233adbd 100644
--- a/src/main/java/com/zy/core/thread/CrnThread.java
+++ b/src/main/java/com/zy/core/thread/CrnThread.java
@@ -2,13 +2,20 @@
import com.zy.core.Slave;
import com.zy.core.ThreadHandler;
+import com.zy.core.cache.MessageQueue;
+import com.zy.core.model.Task;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
/**
* Created by vincent on 2020/8/4
*/
+@Slf4j
+@Data
public class CrnThread implements Runnable, ThreadHandler {
private Slave slave;
+ private int distance;
public CrnThread(Slave slave) {
this.slave = slave;
@@ -20,12 +27,17 @@
while (true) {
try {
System.out.println("绾跨▼"+slave.getId()+"姝e湪杩愯");
+ Task task = MessageQueue.CRN_QUE.poll();
+ if (task == null) {
+ System.out.println("鏃犱换鍔�");
+ } else {
+
+ System.out.println("浠诲姟"+task.getId());
+ }
-
-
- Thread.sleep(1000);
+ Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 41f79b8..82cda93 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -40,14 +40,10 @@
id: 1
ip: 192.168.1.1
port: 8888
- crn[1]:
- id: 2
- ip: 192.168.1.1
- port: 8888
- crn[2]:
- id: 3
- ip: 192.168.1.1
- port: 8888
+# crn[1]:
+# id: 2
+# ip: 192.168.1.1
+# port: 8888
# 绔欑偣
devp[0]:
id: 1
--
Gitblit v1.9.1