package com.example.agvcontroller.socket;
|
|
|
|
import static com.example.agvcontroller.utils.DateUtils.formatDate;
|
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBufUtil;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelHandlerContext;
|
|
import android.os.Handler;
|
import android.os.Looper;
|
import android.os.Message;
|
import android.util.Log;
|
|
import com.example.agvcontroller.AGVApplication;
|
import com.example.agvcontroller.AGVCar;
|
import com.example.agvcontroller.MainActivity;
|
import com.example.agvcontroller.action.AGV_11_UP;
|
import com.example.agvcontroller.action.AckMsgBuilder;
|
import com.example.agvcontroller.met.AbstractInboundHandler;
|
import com.example.agvcontroller.protocol.AGV_03_UP;
|
import com.example.agvcontroller.protocol.AGV_12_UP;
|
import com.example.agvcontroller.protocol.AGV_13_UP;
|
import com.example.agvcontroller.protocol.AGV_A1_DOWN;
|
import com.example.agvcontroller.protocol.AGV_F0_DOWN;
|
import com.example.agvcontroller.protocol.AGV_F0_UP;
|
import com.example.agvcontroller.protocol.AgvAction;
|
import com.example.agvcontroller.protocol.AgvPackage;
|
import com.example.agvcontroller.protocol.ProtocolType;
|
import com.example.agvcontroller.utils.DateUtils;
|
|
|
import org.greenrobot.eventbus.EventBus;
|
|
import java.net.InetSocketAddress;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
public class NettyServerHandler extends AbstractInboundHandler<AgvPackage> {
|
|
private static final String TAG = "NettyServerHandler";
|
private static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
|
private Map<String, Runnable> pendingRemovals = new HashMap<>();
|
int battery = 0;
|
int status = 0;
|
int agvStatus = 0;
|
String positionID = "--";
|
int positionX = 0;
|
int positionY = 0;
|
float agvAngle = 0;
|
float gyroAngle = 0;
|
int forkHeight = 0;
|
int forkExtend = 0;
|
int forkAngle = 0;
|
int agvError = 0;
|
private Handler handler = new Handler(Looper.getMainLooper()) {
|
@Override
|
public void handleMessage(Message msg) {
|
super.handleMessage(msg);
|
String clientId = (String) msg.obj;
|
removeItem(clientId);
|
}
|
};
|
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
String clientId = ctx.channel().remoteAddress().toString();
|
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
String ip = remoteAddress.getAddress().getHostAddress();
|
int port = remoteAddress.getPort();
|
channelMap.put(clientId, ctx.channel());
|
EventBus.getDefault().post(new AGVCar(clientId,ip,port,"--",0));
|
Log.d(TAG, "Client connected: " + clientId);
|
|
// 取消延迟删除操作
|
if (pendingRemovals.containsKey(clientId)) {
|
handler.removeCallbacks(pendingRemovals.get(clientId));
|
pendingRemovals.remove(clientId);
|
}
|
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
String clientId = ctx.channel().remoteAddress().toString();
|
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
String ip = remoteAddress.getAddress().getHostAddress();
|
int port = remoteAddress.getPort();
|
channelMap.remove(clientId);
|
EventBus.getDefault().post(clientId);
|
Log.d(TAG, "Client disconnected: " + clientId);
|
|
// 启动延迟删除操作
|
Runnable removalRunnable = new Runnable() {
|
@Override
|
public void run() {
|
removeItem(clientId);
|
}
|
};
|
pendingRemovals.put(clientId, removalRunnable);
|
handler.postDelayed(removalRunnable, 20000); // 20秒后执行删除操作
|
|
}
|
|
|
private void removeItem(String clientId) {
|
if (channelMap.remove(clientId) != null) {
|
Log.d(TAG, "Client removed after 20 seconds: " + clientId);
|
EventBus.getDefault().post(clientId);
|
} else {
|
Log.d(TAG, "Client already reconnected or not found: " + clientId);
|
}
|
}
|
|
// @Override
|
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
// // 处理接收到的消息
|
// ByteBuf byteBuf = (ByteBuf) msg;
|
// try {
|
// while (byteBuf.isReadable()) {
|
// byte[] bytes = new byte[byteBuf.readableBytes()];
|
// byteBuf.readBytes(bytes);
|
// String hexString = bytesToHex(bytes);
|
// // 获取agv信息 添加到list中
|
// Log.d(TAG, "ctx: " + ctx.channel().remoteAddress().toString() );
|
// Log.d(TAG, "Received: " + hexString);
|
// }
|
// } finally {
|
// byteBuf.release();
|
// }
|
// }
|
|
@Override
|
protected boolean channelRead0(ChannelHandlerContext ctx, AgvPackage pac) throws Exception {
|
String clientId = ctx.channel().remoteAddress().toString();
|
InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
String ip = remoteAddress.getAddress().getHostAddress();
|
int port = remoteAddress.getPort();
|
Log.i("clientId--->",clientId);
|
Log.i("substring",pac.toString());
|
String serialNum = pac.getBody().getMessageBody().getSerialNo();
|
Log.i("substring",serialNum);
|
MainActivity.map.put(serialNum, Boolean.TRUE);
|
// ack
|
ProtocolType ackType = isNeedAck(pac);
|
final String uniqueNo = pac.getHeader().getUniqueNo();
|
String agvNo;
|
AGVCar agvCar;
|
|
String log;
|
|
label : switch (pac.getHeader().getProtocolType()){
|
case ACTION_COMPLETE: // 动作完成数据包
|
AGV_11_UP agv_11_up = (AGV_11_UP) pac.getBody().getMessageBody();
|
// redis.push(RedisConstant.AGV_COMPLETE_FLAG, AgvProtocol.build(uniqueNo).setMessageBody(agv_11_up));
|
// 动作完成应答
|
if (null != ackType) {
|
AgvPackage ackPac = AckMsgBuilder.ofSuccess(pac, ackType);
|
AGV_A1_DOWN agv_a1_down = (AGV_A1_DOWN) ackPac.getBody().getMessageBody();
|
agv_a1_down.setAckSign((byte) agv_11_up.getCompleteCode());
|
ctx.writeAndFlush(ackPac);
|
}
|
break label;
|
case DATA_CODE_REPORT:
|
AGV_12_UP agv_12_up = (AGV_12_UP) pac.getBody().getMessageBody();
|
agvNo = pac.getHeader().getUniqueNo();
|
channelMap.put(clientId, ctx.channel());
|
agvStatus = agv_12_up.getStatus();
|
positionID = agv_12_up.getQrCode();
|
positionX = agv_12_up.getOffsetX();
|
positionY = agv_12_up.getOffsetY();
|
agvAngle = agv_12_up.getAGVCurrentAngle();
|
gyroAngle = agv_12_up.getGyroAngle();
|
forkHeight = agv_12_up.getCurrentAltitude();
|
forkExtend = agv_12_up.getForkLength();
|
forkAngle = agv_12_up.getLoaderTheta();
|
agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError);
|
EventBus.getDefault().post(agvCar);
|
log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[有码实时数据包]>>>" + pac.getSourceHexStr();
|
Log.d("updown", log);
|
AGVApplication.addLog(log);
|
break label;
|
case DATA_WITHOUT_CODE_REPORT:
|
AGV_13_UP agv_13_up = (AGV_13_UP) pac.getBody().getMessageBody();
|
agvNo = pac.getHeader().getUniqueNo();
|
channelMap.put(clientId, ctx.channel());
|
agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError);
|
EventBus.getDefault().post(agvCar);
|
log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss:SSS") + " 上行: " + ip + "[无码实时数据包]>>>" + pac.getSourceHexStr();
|
Log.d("updown", log);
|
AGVApplication.addLog(log);
|
break label;
|
case HEARTBEAT_REPORT:
|
AGV_03_UP agv_03_up = (AGV_03_UP) pac.getBody().getMessageBody();
|
battery = agv_03_up.getBattery();
|
agvError = agv_03_up.getError();
|
// pac.getBody().getMessageBody()
|
agvNo = pac.getHeader().getUniqueNo();
|
channelMap.put(clientId, ctx.channel());
|
agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError);
|
EventBus.getDefault().post(agvCar);
|
log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[心跳包]>>>" + pac.getSourceHexStr();
|
Log.d("updown", log);
|
AGVApplication.addLog(log);
|
break label;
|
case LOGIN_REPORT:
|
AGV_F0_UP agv_f0_up = (AGV_F0_UP) pac.getBody().getMessageBody();
|
if (null != ackType) {
|
AgvPackage ackPac = AckMsgBuilder.ofSuccess(pac, ackType);
|
AGV_F0_DOWN agv_f0_down = (AGV_F0_DOWN) ackPac.getBody().getMessageBody();
|
log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[登录包]>>>" + pac.getSourceHexStr();
|
Log.d("updown", log);
|
AGVApplication.addLog(log);
|
|
// EventBus.getDefault().post(log);
|
|
ctx.writeAndFlush(ackPac);
|
}
|
battery = agv_f0_up.getBattery();
|
// pac.getBody().getMessageBody()
|
agvNo = pac.getHeader().getUniqueNo();
|
channelMap.put(clientId, ctx.channel());
|
agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError);
|
EventBus.getDefault().post(agvCar);
|
break label;
|
|
}
|
return false;
|
}
|
|
private String bytesToHex(byte[] bytes) {
|
StringBuilder sb = new StringBuilder();
|
for (byte b : bytes) {
|
sb.append(String.format("%02x", b));
|
}
|
return sb.toString();
|
}
|
|
// 将十六进制字符串转换为字节数组
|
private byte[] hexStringToByteArray(String s) {
|
int len = s.length();
|
byte[] data = new byte[len / 2];
|
for (int i = 0; i < len; i += 2) {
|
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
|
+ Character.digit(s.charAt(i+1), 16));
|
}
|
return data;
|
}
|
|
public static void sendMessageToClient(String clientId, byte[] message) {
|
Channel channel = channelMap.get(clientId);
|
if (channel != null && channel.isActive()) {
|
ByteBuf buf = Unpooled.wrappedBuffer(message);
|
String upperCase = ByteBufUtil.hexDump(buf).toUpperCase();
|
Log.d(TAG, "upperCase " + upperCase);
|
channel.writeAndFlush(buf);
|
} else {
|
Log.d(TAG, "Client " + clientId + " is not connected");
|
}
|
}
|
|
public static void sendMessageToClient(String clientId, AgvAction<?> action) {
|
|
|
|
Channel channel = channelMap.get(clientId);
|
if (channel != null && channel.isActive()) {
|
|
channel.writeAndFlush(action);
|
} else {
|
Log.d(TAG, "Client " + clientId + " is not connected");
|
}
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
cause.printStackTrace();
|
ctx.close();
|
}
|
|
/**
|
* 服务器是否需要应答
|
*/
|
public static ProtocolType isNeedAck(AgvPackage pac) {
|
switch (pac.getHeader().getProtocolType()) {
|
case ACTION_COMPLETE:
|
return ProtocolType.ACTION_SUCCESS_ACK;
|
case LOGIN_REPORT:
|
return ProtocolType.LOGIN_ACK;
|
default:
|
return null;
|
}
|
}
|
|
}
|