package com.zy.core.netty.handle; import com.core.common.SnowflakeIdWorker; import com.zy.core.netty.constant.Constant; import com.zy.core.netty.domain.ChPackage; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.List; /** * Created by vincent on 2019-04-10 */ public class ProtocolDecoder extends ByteToMessageDecoder { private final SnowflakeIdWorker snowflakeIdWorker; public ProtocolDecoder(SnowflakeIdWorker snowflakeIdWorker) { this.snowflakeIdWorker = snowflakeIdWorker; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List list) throws Exception { int startMark = indexOfStartMark(in); if (startMark == -1) { return; } // 去除无用前缀报文 if (startMark != 0) { in.readerIndex(startMark); in.discardReadBytes(); } // 生成和初始化消息包装类 String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); ChPackage pac = ChPackage.valueOfEmpty(String.valueOf(snowflakeIdWorker.nextId()), ip); pac.setSourceBuff(in); // 解析 list.add(analyzeProtocol(pac)); } public ChPackage analyzeProtocol(ChPackage pac) { ByteBuf byteBuf = pac.getSourceBuff(); // 备份缓冲区 ByteBuf body = byteBuf.duplicate(); if (null != body && body.readableBytes() >= 0) { pac.setContent(body); } else { return null; } // 字节数组 body.resetReaderIndex(); byte[] bytes = new byte[body.readableBytes()]; body.readBytes(bytes); body.resetReaderIndex(); pac.setBytes(bytes); // ascii if (bytes.length > 0) { pac.setAscii(new String(pac.getBytes(), StandardCharsets.US_ASCII)); } // 备份字符串 if (null != pac.getSourceBuff() && null == pac.getSourceHexStr()) { pac.getSourceBuff().resetReaderIndex(); pac.setSourceHexStr(ByteBufUtil.hexDump(pac.getSourceBuff())); pac.getSourceBuff().resetReaderIndex(); } byteBuf.skipBytes(byteBuf.readableBytes()); // pac.getSourceBuff().readByte(); return pac; } // 获取标识位下标 private int indexOfStartMark(ByteBuf inputBuffer) { int length = inputBuffer.writerIndex(); // 报文长度至少大于2 if (length < 2) { return -1; } int readerIndex = inputBuffer.readerIndex(); for (int i = readerIndex; i < length - 1; i++) { byte b1 = inputBuffer.getByte(i); // "#" = b1 if (0x23 == b1) { // "#" = b2 if (i + 1 <= length && 0x23 == inputBuffer.getByte(i + 1)) { return i; } } } return -1; } private String byte2Str(ByteBuf buf, int len) { byte[] bytes = new byte[len]; buf.readBytes(bytes); return new String(bytes, Constant.CHARSET_GBK); } }