package com.zy.core.task;
|
|
import com.zy.asrs.entity.BasStationOpt;
|
import com.zy.asrs.service.BasStationOptService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
@Slf4j
|
@Component
|
public class BasStationOptAsyncPublisher {
|
|
private static final String LANE_NAME = "bas-station-opt-save";
|
private static final String TASK_NAME = "publish-bas-station-opt";
|
private static final long MIN_INTERVAL_MS = 0L;
|
private static final int DEFAULT_QUEUE_CAPACITY = 2048;
|
|
private final ArrayBlockingQueue<BasStationOpt> pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
|
|
@Autowired
|
private MainProcessTaskSubmitter mainProcessTaskSubmitter;
|
@Autowired
|
private BasStationOptService basStationOptService;
|
|
public boolean publish(BasStationOpt basStationOpt) {
|
if (basStationOpt == null) {
|
return true;
|
}
|
if (!pendingQueue.offer(basStationOpt)) {
|
log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}",
|
basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId());
|
return false;
|
}
|
mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain);
|
return true;
|
}
|
|
private void drain() {
|
while (true) {
|
BasStationOpt basStationOpt = pendingQueue.poll();
|
if (basStationOpt == null) {
|
return;
|
}
|
try {
|
basStationOptService.save(basStationOpt);
|
} catch (Exception e) {
|
log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}",
|
basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e);
|
try {
|
basStationOptService.save(basStationOpt);
|
} catch (Exception ex) {
|
log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}",
|
basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex);
|
}
|
}
|
}
|
}
|
}
|