#
vincentlu
2026-04-13 51481277710894cbffdcaf992b0a00a047039f16
#
2个文件已添加
1个文件已删除
351 ■■■■■ 已修改文件
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/HkInstantActionMessageListener.java 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/HkOrderMessageListener.java 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/MessageListener.java 149 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/HkInstantActionMessageListener.java
New file
@@ -0,0 +1,98 @@
package com.zy.acs.hk.latent.listen;
import com.alibaba.fastjson.JSON;
import com.zy.acs.common.constant.RedisConstant;
import com.zy.acs.common.hk.action.HkInstantActionDown;
import com.zy.acs.common.hk.action.HkInstantActionMessage;
import com.zy.acs.common.utils.RedisSupport;
import com.zy.acs.framework.common.Cools;
import com.zy.acs.hk.latent.mqtt.publisher.HkInstantActionPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
 * 海康 instantActions 下发监听器。
 */
@Slf4j
@Component
public class HkInstantActionMessageListener {
    private static final long POLL_INTERVAL_MS = 30L;
    private Thread thread;
    private final RedisSupport redis = RedisSupport.defaultRedisSupport;
    @Autowired
    private HkInstantActionPublisher hkInstantActionPublisher;
    @PostConstruct
    private void start() {
        thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                HkInstantActionDown instantActionDown = redis.pop(RedisConstant.HK_AGV_INSTANT_ACTION_DOWN_FLAG);
                if (instantActionDown != null) {
                    handle(instantActionDown);
                }
                try {
                    Thread.sleep(POLL_INTERVAL_MS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception ignore) {
                }
            }
        }, "hk-instant-action-message-listener");
        thread.start();
    }
    @PreDestroy
    public void shutDown() {
        if (thread != null) {
            thread.interrupt();
        }
    }
    private void handle(HkInstantActionDown instantActionDown) {
        try {
            log.info("consume hk instant action down >>> {}", JSON.toJSONString(instantActionDown));
            HkInstantActionMessage instantActionMessage = normalize(instantActionDown);
            if (instantActionMessage == null) {
                log.error("drop invalid hk instant action message, payload={}", JSON.toJSONString(instantActionDown));
                return;
            }
            hkInstantActionPublisher.publish(instantActionMessage);
        } catch (IllegalArgumentException e) {
            log.error("drop illegal hk instant action message, payload={}", JSON.toJSONString(instantActionDown), e);
        } catch (Exception e) {
            log.error("publish hk instant action failed, requeue to head payload={}", JSON.toJSONString(instantActionDown), e);
            try {
                redis.pushHeadStrict(RedisConstant.HK_AGV_INSTANT_ACTION_DOWN_FLAG, instantActionDown);
            } catch (Exception pushEx) {
                log.error("requeue hk instant action failed, payload={}", JSON.toJSONString(instantActionDown), pushEx);
            }
        }
    }
    private HkInstantActionMessage normalize(HkInstantActionDown instantActionDown) {
        if (instantActionDown == null || instantActionDown.getInstantActionMessage() == null) {
            return null;
        }
        HkInstantActionMessage instantActionMessage = instantActionDown.getInstantActionMessage();
        if (Cools.isEmpty(instantActionMessage.getSerialNumber()) && !Cools.isEmpty(instantActionDown.getAgvNo())) {
            instantActionMessage.setSerialNumber(instantActionDown.getAgvNo());
        }
        if (Cools.isEmpty(instantActionMessage.getSerialNumber())) {
            return null;
        }
        if (Cools.isEmpty(instantActionMessage.getInstantActions())) {
            return null;
        }
        return instantActionMessage;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/HkOrderMessageListener.java
New file
@@ -0,0 +1,104 @@
package com.zy.acs.hk.latent.listen;
import com.alibaba.fastjson.JSON;
import com.zy.acs.common.constant.RedisConstant;
import com.zy.acs.common.hk.order.HkOrderDown;
import com.zy.acs.common.hk.order.HkOrderMessage;
import com.zy.acs.common.utils.RedisSupport;
import com.zy.acs.framework.common.Cools;
import com.zy.acs.hk.latent.mqtt.publisher.HkOrderPublisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
 * 海康 order 下发监听器。
 */
@Slf4j
@Component
public class HkOrderMessageListener {
    private static final long POLL_INTERVAL_MS = 30L;
    private Thread thread;
    private final RedisSupport redis = RedisSupport.defaultRedisSupport;
    @Autowired
    private HkOrderPublisher hkOrderPublisher;
    @PostConstruct
    private void start() {
        thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                HkOrderDown orderDown = redis.pop(RedisConstant.HK_AGV_PATH_DOWN_FLAG);
                if (orderDown != null) {
                    handle(orderDown);
                }
                try {
                    Thread.sleep(POLL_INTERVAL_MS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception ignore) {
                }
            }
        }, "hk-order-message-listener");
        thread.start();
    }
    @PreDestroy
    public void shutDown() {
        if (thread != null) {
            thread.interrupt();
        }
    }
    private void handle(HkOrderDown orderDown) {
        try {
            log.info("consume hk order down >>> {}", JSON.toJSONString(orderDown));
            HkOrderMessage orderMessage = normalize(orderDown);
            if (orderMessage == null) {
                log.error("drop invalid hk order message, payload={}", JSON.toJSONString(orderDown));
                return;
            }
            hkOrderPublisher.publish(orderMessage);
        } catch (IllegalArgumentException e) {
            log.error("drop illegal hk order message, payload={}", JSON.toJSONString(orderDown), e);
        } catch (Exception e) {
            log.error("publish hk order failed, requeue to head payload={}", JSON.toJSONString(orderDown), e);
            try {
                redis.pushHeadStrict(RedisConstant.HK_AGV_PATH_DOWN_FLAG, orderDown);
            } catch (Exception pushEx) {
                log.error("requeue hk order failed, payload={}", JSON.toJSONString(orderDown), pushEx);
            }
        }
    }
    private HkOrderMessage normalize(HkOrderDown orderDown) {
        if (orderDown == null || orderDown.getOrderMessage() == null) {
            return null;
        }
        HkOrderMessage orderMessage = orderDown.getOrderMessage();
        if (Cools.isEmpty(orderMessage.getSerialNumber()) && !Cools.isEmpty(orderDown.getAgvNo())) {
            orderMessage.setSerialNumber(orderDown.getAgvNo());
        }
        if (Cools.isEmpty(orderMessage.getOrderId()) && !Cools.isEmpty(orderDown.getActionGroupId())) {
            orderMessage.setOrderId(orderDown.getActionGroupId());
        }
        if (Cools.isEmpty(orderMessage.getSerialNumber())) {
            return null;
        }
        if (Cools.isEmpty(orderMessage.getOrderId())) {
            return null;
        }
        if (orderMessage.getOrderUpdateId() == null) {
            orderMessage.setOrderUpdateId(0L);
        }
        return orderMessage;
    }
}
zy-acs-hk/zy-acs-hk-latent/src/main/java/com/zy/acs/hk/latent/listen/MessageListener.java
File was deleted