package com.zy.core.task; import com.zy.asrs.entity.BasStationOpt; import com.zy.asrs.service.BasStationOptService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.ArrayBlockingQueue; @Slf4j @Component public class BasStationOptAsyncPublisher { private static final String LANE_NAME = "bas-station-opt-save"; private static final String TASK_NAME = "publish-bas-station-opt"; private static final long MIN_INTERVAL_MS = 0L; private static final int DEFAULT_QUEUE_CAPACITY = 2048; private final ArrayBlockingQueue pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); @Autowired private MainProcessTaskSubmitter mainProcessTaskSubmitter; @Autowired private BasStationOptService basStationOptService; public boolean publish(BasStationOpt basStationOpt) { if (basStationOpt == null) { return true; } if (!pendingQueue.offer(basStationOpt)) { log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}", basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId()); return false; } mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain); return true; } private void drain() { while (true) { BasStationOpt basStationOpt = pendingQueue.poll(); if (basStationOpt == null) { return; } try { basStationOptService.save(basStationOpt); } catch (Exception e) { log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}", basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e); try { basStationOptService.save(basStationOpt); } catch (Exception ex) { log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}", basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex); } } } } }