package com.example.agvcontroller.socket; import static com.example.agvcontroller.utils.DateUtils.formatDate; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.Log; import com.example.agvcontroller.AGVApplication; import com.example.agvcontroller.AGVCar; import com.example.agvcontroller.MainActivity; import com.example.agvcontroller.action.AGV_11_UP; import com.example.agvcontroller.action.AckMsgBuilder; import com.example.agvcontroller.met.AbstractInboundHandler; import com.example.agvcontroller.protocol.AGV_03_UP; import com.example.agvcontroller.protocol.AGV_12_UP; import com.example.agvcontroller.protocol.AGV_13_UP; import com.example.agvcontroller.protocol.AGV_A1_DOWN; import com.example.agvcontroller.protocol.AGV_F0_DOWN; import com.example.agvcontroller.protocol.AGV_F0_UP; import com.example.agvcontroller.protocol.AgvAction; import com.example.agvcontroller.protocol.AgvPackage; import com.example.agvcontroller.protocol.ProtocolType; import com.example.agvcontroller.utils.DateUtils; import org.greenrobot.eventbus.EventBus; import java.net.InetSocketAddress; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class NettyServerHandler extends AbstractInboundHandler { private static final String TAG = "NettyServerHandler"; private static ConcurrentHashMap channelMap = new ConcurrentHashMap<>(); private Map pendingRemovals = new HashMap<>(); AGVCar agvCar; int battery = 0; int status = 0; int agvStatus = 0; String positionID = "--"; int positionX = 0; int positionY = 0; float agvAngle = 0; float gyroAngle = 0; int forkHeight = 0; int forkExtend = 0; int forkAngle = 0; int agvError = 0; String agvNo = "--"; String log; private Handler handler = new Handler(Looper.getMainLooper()) { @Override public void handleMessage(Message msg) { super.handleMessage(msg); String clientId = (String) msg.obj; removeItem(clientId); } }; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String clientId = ctx.channel().remoteAddress().toString(); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = remoteAddress.getAddress().getHostAddress(); int port = remoteAddress.getPort(); channelMap.put(clientId, ctx.channel()); agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); // Log.d(TAG, "Client connected: " + clientId); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[tcp]>>>已连接"; Log.d("updown", log); AGVApplication.addLog(log); // // 取消延迟删除操作 // if (pendingRemovals.containsKey(clientId)) { // handler.removeCallbacks(pendingRemovals.get(clientId)); // pendingRemovals.remove(clientId); // } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String clientId = ctx.channel().remoteAddress().toString(); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = remoteAddress.getAddress().getHostAddress(); int port = remoteAddress.getPort(); // channelMap.remove(clientId); EventBus.getDefault().post(clientId); // Log.d(TAG, "Client disconnected: " + clientId); agvCar = new AGVCar(clientId, ip, port, agvNo, 0, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); // Log.d(TAG, "Client connected: " + clientId); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[tcp]>>>断开连接"; Log.d("updown", log); AGVApplication.addLog(log); // // 启动延迟删除操作 // Runnable removalRunnable = new Runnable() { // @Override // public void run() { // removeItem(clientId); // } // }; // pendingRemovals.put(clientId, removalRunnable); // handler.postDelayed(removalRunnable, 20000); // 20秒后执行删除操作 } private void removeItem(String clientId) { // 原先是要删除后续为了能动态自动连接需要更新 // if (channelMap.remove(clientId) != null) { // Log.d(TAG, "Client removed after 20 seconds: " + clientId); // EventBus.getDefault().post(clientId); // } else { // Log.d(TAG, "Client already reconnected or not found: " + clientId); // } if (channelMap.remove(clientId) != null) { Log.d(TAG, "Client removed after 20 seconds: " + clientId); EventBus.getDefault().post(clientId); } else { Log.d(TAG, "Client already reconnected or not found: " + clientId); } } // @Override // public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // // 处理接收到的消息 // ByteBuf byteBuf = (ByteBuf) msg; // try { // while (byteBuf.isReadable()) { // byte[] bytes = new byte[byteBuf.readableBytes()]; // byteBuf.readBytes(bytes); // String hexString = bytesToHex(bytes); // // 获取agv信息 添加到list中 // Log.d(TAG, "ctx: " + ctx.channel().remoteAddress().toString() ); // Log.d(TAG, "Received: " + hexString); // } // } finally { // byteBuf.release(); // } // } @Override protected boolean channelRead0(ChannelHandlerContext ctx, AgvPackage pac) throws Exception { String clientId = ctx.channel().remoteAddress().toString(); InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = remoteAddress.getAddress().getHostAddress(); int port = remoteAddress.getPort(); Log.i("clientId--->",clientId); Log.i("substring",pac.toString()); String serialNum = pac.getBody().getMessageBody().getSerialNo(); Log.i("substring",serialNum); MainActivity.map.put(serialNum, Boolean.TRUE); // ack ProtocolType ackType = isNeedAck(pac); final String uniqueNo = pac.getHeader().getUniqueNo(); label : switch (pac.getHeader().getProtocolType()){ case ACTION_COMPLETE: // 动作完成数据包 AGV_11_UP agv_11_up = (AGV_11_UP) pac.getBody().getMessageBody(); // redis.push(RedisConstant.AGV_COMPLETE_FLAG, AgvProtocol.build(uniqueNo).setMessageBody(agv_11_up)); // 动作完成应答 if (null != ackType) { AgvPackage ackPac = AckMsgBuilder.ofSuccess(pac, ackType); AGV_A1_DOWN agv_a1_down = (AGV_A1_DOWN) ackPac.getBody().getMessageBody(); agv_a1_down.setAckSign((byte) agv_11_up.getCompleteCode()); ctx.writeAndFlush(ackPac); } break label; case DATA_CODE_REPORT: AGV_12_UP agv_12_up = (AGV_12_UP) pac.getBody().getMessageBody(); agvNo = pac.getHeader().getUniqueNo(); channelMap.put(clientId, ctx.channel()); agvStatus = agv_12_up.getStatus(); positionID = agv_12_up.getQrCode(); positionX = agv_12_up.getOffsetX(); positionY = agv_12_up.getOffsetY(); agvAngle = agv_12_up.getAGVCurrentAngle(); gyroAngle = agv_12_up.getGyroAngle(); forkHeight = agv_12_up.getCurrentAltitude(); forkExtend = agv_12_up.getForkLength(); forkAngle = agv_12_up.getLoaderTheta(); agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[有码实时数据包]>>>" + pac.getSourceHexStr(); Log.d("updown", log); AGVApplication.addLog(log); break label; case DATA_WITHOUT_CODE_REPORT: AGV_13_UP agv_13_up = (AGV_13_UP) pac.getBody().getMessageBody(); agvNo = pac.getHeader().getUniqueNo(); channelMap.put(clientId, ctx.channel()); agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss:SSS") + " 上行: " + ip + "[无码实时数据包]>>>" + pac.getSourceHexStr(); Log.d("updown", log); AGVApplication.addLog(log); break label; case HEARTBEAT_REPORT: AGV_03_UP agv_03_up = (AGV_03_UP) pac.getBody().getMessageBody(); battery = agv_03_up.getBattery(); agvError = agv_03_up.getError(); // pac.getBody().getMessageBody() agvNo = pac.getHeader().getUniqueNo(); channelMap.put(clientId, ctx.channel()); agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[心跳包]>>>" + pac.getSourceHexStr(); Log.d("updown", log); AGVApplication.addLog(log); break label; case LOGIN_REPORT: AGV_F0_UP agv_f0_up = (AGV_F0_UP) pac.getBody().getMessageBody(); if (null != ackType) { AgvPackage ackPac = AckMsgBuilder.ofSuccess(pac, ackType); AGV_F0_DOWN agv_f0_down = (AGV_F0_DOWN) ackPac.getBody().getMessageBody(); log = formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS") + " 上行: " + ip + "[登录包]>>>" + pac.getSourceHexStr(); Log.d("updown", log); AGVApplication.addLog(log); // EventBus.getDefault().post(log); ctx.writeAndFlush(ackPac); } battery = agv_f0_up.getBattery(); // pac.getBody().getMessageBody() agvNo = pac.getHeader().getUniqueNo(); channelMap.put(clientId, ctx.channel()); agvCar = new AGVCar(clientId, ip, port, agvNo, 1, battery,agvStatus,positionID,positionX,positionY,agvAngle,gyroAngle,forkHeight,forkExtend,forkAngle,agvError); EventBus.getDefault().post(agvCar); break label; } return false; } private String bytesToHex(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02x", b)); } return sb.toString(); } // 将十六进制字符串转换为字节数组 private byte[] hexStringToByteArray(String s) { int len = s.length(); byte[] data = new byte[len / 2]; for (int i = 0; i < len; i += 2) { data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i+1), 16)); } return data; } public static void sendMessageToClient(String clientId, byte[] message) { Channel channel = channelMap.get(clientId); if (channel != null && channel.isActive()) { ByteBuf buf = Unpooled.wrappedBuffer(message); String upperCase = ByteBufUtil.hexDump(buf).toUpperCase(); Log.d(TAG, "upperCase " + upperCase); channel.writeAndFlush(buf); } else { Log.d(TAG, "Client " + clientId + " is not connected"); } } public static void sendMessageToClient(String clientId, AgvAction action) { Channel channel = channelMap.get(clientId); if (channel != null && channel.isActive()) { channel.writeAndFlush(action); } else { Log.d(TAG, "Client " + clientId + " is not connected"); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 服务器是否需要应答 */ public static ProtocolType isNeedAck(AgvPackage pac) { switch (pac.getHeader().getProtocolType()) { case ACTION_COMPLETE: return ProtocolType.ACTION_SUCCESS_ACK; case LOGIN_REPORT: return ProtocolType.LOGIN_ACK; default: return null; } } }