package com.example.agvcontroller.protocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.util.ReferenceCountUtil; import com.example.agvcontroller.socket.RadixTools; import java.net.InetSocketAddress; import java.util.List; /** * Created by vincent on 2019-04-10 */ public class ProtocolDecoder extends ByteToMessageDecoder { private final int maxFrameLength; public ProtocolDecoder(int maxFrameLength){ this.maxFrameLength = maxFrameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); // log.info("ip:{} ===>> {}", ip, ByteBufUtil.hexDump(in).toUpperCase()); int startMark = indexOfStartMark(in); if (startMark == -1){ in.skipBytes(in.readableBytes()); return; } if (in.readableBytes() > maxFrameLength) { in.skipBytes(in.readableBytes()); return; } // 去除无用前缀报文 if (startMark != 0){ in.readerIndex(startMark); in.discardReadBytes(); } // 粘包处理 in.markReaderIndex(); in.readerIndex(PackagePart.CONTENT_LENGTH.getStartIndex()); int pacTotalLen = PackagePart.START_SYMBOL.getLen() + PackagePart.CONTENT_LENGTH.getLen() + in.readShortLE() + PackagePart.VALIDE_CODE.getLen() ; in.resetReaderIndex(); ByteBuf byteBuf = in.readSlice(pacTotalLen); // 生成和初始化消息包装类 AgvPackage pac = AgvPackage.valueOfEmpty(); pac.setSourceBuff(byteBuf); // 解析 out.add(analyzeProtocol(pac)); // byteBuf.resetReaderIndex(); // tip: 如果 ridx == widx,则 refCnt = 0 in.resetReaderIndex(); in.skipBytes(pacTotalLen); // tip: This method will be called till either the input has nothing to read,无语! ReferenceCountUtil.retain(in); // 解码器自动释放 refCnt - 1 } public AgvPackage analyzeProtocol(AgvPackage pac){ ByteBuf byteBuf = pac.getSourceBuff(); PacHeader pacHeader = pac.getHeader(); // 唯一标识码 byteBuf.readerIndex(PackagePart.UNIQUENO.getStartIndex()); byte[] bytes = new byte[PackagePart.UNIQUENO.getLen()]; byteBuf.readBytes(bytes); Utils.reverse(bytes); String uniqueno = String.valueOf(RadixTools.bytesToInt(bytes)); if (!Cools.isEmpty(uniqueno)) { pacHeader.setUniqueNo(uniqueno); byteBuf.resetReaderIndex(); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } // 获取标识符 byte startSymbol = byteBuf.readByte(); DirectionType direction = null; if ((0xEE&0xFF) == (startSymbol&0xFF)) { pacHeader.setStartSymbol(startSymbol); direction = DirectionType.DOWN; } else if ((0xAA&0xFF) == (startSymbol&0xFF)) { pacHeader.setStartSymbol(startSymbol); direction = DirectionType.UP; } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } // 获取协议体长度 int contentLength = byteBuf.readShortLE(); if (contentLength >= 0) { pacHeader.setContentLength(contentLength); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } // skip uniqueNo byteBuf.readerIndex(byteBuf.readerIndex() + PackagePart.UNIQUENO.getLen()); // 获取时间戳 byteBuf.readerIndex(PackagePart.TIMESTAMP.getStartIndex()); int timestamp = byteBuf.readIntLE(); if (timestamp > 0) { pacHeader.setTimestamp(timestamp); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } // 获取命令标识 int commandByte = byteBuf.readByte() & 0xFF; ProtocolType protocolType = ProtocolType.getByCode(commandByte, direction); if (null != protocolType) { pacHeader.setProtocolType(protocolType); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } // 获取协议体 if (contentLength > 0) { ByteBuf body = byteBuf.readSlice(pac.getHeader().getContentLength() - ( + PackagePart.UNIQUENO.getLen() + PackagePart.TIMESTAMP.getLen() + PackagePart.COMMAND_MARK.getLen() )); if (null != body && body.readableBytes() >= 0) { pac.getBody().setContent(body); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } } // 校验码 if(byteBuf.readableBytes() == 2) { pac.setValidCode(byteBuf.readUnsignedShortLE()); } else { return pac.setErrorPac(Boolean.TRUE).setPacErrorType(PacErrorType.PAC_ANALYZE_ERROR); } return pac; } // 获取标识位下标 private int indexOfStartMark(ByteBuf inputBuffer){ int length = inputBuffer.writerIndex(); // 报文长度至少大于2 if (length < 2) { return -1; } int readerIndex = inputBuffer.readerIndex(); for(int i = readerIndex; i < length - 1; i ++) { byte b1 = inputBuffer.getByte(i); if ((0xEE&0xFF) == (b1&0xFF) || (0xAA&0xFF) == (b1&0xFF)) { return i; } } return -1; } }