From f585af70f85d9f4934dcfebf03a9fd358c229fc7 Mon Sep 17 00:00:00 2001
From: zwl <1051256694@qq.com>
Date: 星期二, 01 七月 2025 10:03:19 +0800
Subject: [PATCH] 添加数据库
---
src/main/java/com/zy/core/cache/MessageQueue.java | 20 ++++++++++++++++++--
1 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/zy/core/cache/MessageQueue.java b/src/main/java/com/zy/core/cache/MessageQueue.java
index 8bb9d7c..5ffad0b 100644
--- a/src/main/java/com/zy/core/cache/MessageQueue.java
+++ b/src/main/java/com/zy/core/cache/MessageQueue.java
@@ -16,7 +16,7 @@
public class MessageQueue {
// 鍫嗗灈鏈簃q浜ゆ崲鏈�
- private static final Map<Integer, ConcurrentLinkedQueue<Task>> CRN_EXCHANGE = new ConcurrentHashMap<>();
+ private static final Map<Integer, LinkedBlockingQueue<Task>> CRN_EXCHANGE = new ConcurrentHashMap<>();
// 杈撻�佺嚎mq浜ゆ崲鏈�
private static final Map<Integer, ConcurrentLinkedQueue<Task>> DEVP_EXCHANGE = new ConcurrentHashMap<>();
// 鏉$爜鎵弿浠猰q浜ゆ崲鏈�
@@ -34,7 +34,7 @@
public static void init(SlaveType type, Slave slave) {
switch (type) {
case Crn:
- CRN_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
+ CRN_EXCHANGE.put(slave.getId(), new LinkedBlockingQueue<>(1));
break;
case Devp:
DEVP_EXCHANGE.put(slave.getId(), new ConcurrentLinkedQueue<>());
@@ -127,6 +127,7 @@
public static void clear(SlaveType type, Integer id){
switch (type) {
case Crn:
+
CRN_EXCHANGE.get(id).clear();
break;
case Devp:
@@ -148,5 +149,20 @@
break;
}
}
+ public static boolean offer(SlaveType type, Integer devpId, Task task, Runnable callback) {
+ boolean result = offer(type, devpId, task); // 鍏堟墽琛屽師鏈変换鍔¢�昏緫
+ if (result && callback != null) {
+ new Thread(() -> {
+ try {
+ Thread.sleep(200); // 妯℃嫙浠诲姟鎵ц鏃堕棿
+ callback.run();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
+ }
+ return result;
+ }
+
}
--
Gitblit v1.9.1