ByteToMessage,MessageToByte,MessageToMessage
串行的数据流的编码解码
ByteToMessage
public static void main(String[] args) {
testM2M();
}
private static void testBase(){
// xxxx\r\nxxx\r\n
ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
protected void decode(
ChannelHandlerContext channelHandlerContext, // 上下文
ByteBuf byteBuf, // 数据buffer
List<Object> list // 传给下一层的对象
) throws Exception {
while (byteBuf.isReadable()) {
int crlfIndex = byteBuf.forEachByte(ByteProcessor.FIND_CRLF);
if (crlfIndex == -1) {
break;
}
byte[] bytes = new byte[crlfIndex - byteBuf.readerIndex()];
byteBuf.readBytes(bytes,0,crlfIndex - byteBuf.readerIndex());
byteBuf.readerIndex(crlfIndex);
byteBuf.readShort();
list.add(new String(bytes));
}
}
};
MessageToMessage
ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
@Override
protected void decode(
ChannelHandlerContext channelHandlerContext, // 上下文
ByteBuf byteBuf, // 数据buffer
List<Object> list // 传给下一层的对象
) throws Exception {
while (byteBuf.isReadable()) {
int crlfIndex = byteBuf.forEachByte(ByteProcessor.FIND_CRLF);
if (crlfIndex == -1) {
break;
}
byte[] bytes = new byte[crlfIndex - byteBuf.readerIndex()];
byteBuf.readBytes(bytes,0,crlfIndex - byteBuf.readerIndex());
byteBuf.readerIndex(crlfIndex);
byteBuf.readShort();
list.add(new String(bytes));
}
}
};
MessageToMessageDecoder decoderM2M = new MessageToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
if (o instanceof String) {
list.add(String.format("[%s]",o));
}
}
};
EmbeddedChannel channel = new EmbeddedChannel(decoder);
channel.pipeline().addLast(decoderM2M);
channel.writeInbound(Unpooled.wrappedBuffer("hello\r\nworld\r\n".getBytes(StandardCharsets.UTF_8)));
assert "[hello]" == (String) channel.readInbound();
assert "[world]" == (String) channel.readInbound();
Replaying
// replaying 可以用在一个Message再传输时不在同一时间到达的情况
ReplayingDecoder<Void> replayingDecoder = new ReplayingDecoder<Void>() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
ByteBuf msg = in.readBytes(in.bytesBefore((byte) '\n'));
out.add(msg);
in.skipBytes(1);
}
};
EmbeddedChannel ch = new EmbeddedChannel(replayingDecoder);
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'A'}));
assert Objects.isNull(ch.readInbound());
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'B'}));
assert Objects.isNull(ch.readInbound());
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'C'}));
assert Objects.isNull(ch.readInbound());
// 直到 \n 传到才完成一个Message的decode
ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'\n'}));
ByteBuf buf = Unpooled.wrappedBuffer(new byte[] { 'A', 'B', 'C' });
ByteBuf buf2 = ch.readInbound();
assert buf.toString(StandardCharsets.UTF_8).equals(buf2.toString(StandardCharsets.UTF_8));
buf2.release();
buf.release();
ch.finish();
Codec
// codec 同时有decode和encode
ByteToMessageCodec<Integer> codec = new ByteToMessageCodec<Integer>() {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
out.writeInt(msg);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
};
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(1);
buffer.writeByte('0');
EmbeddedChannel ch = new EmbeddedChannel(codec);
assert ch.writeInbound(buffer);
ch.pipeline().remove(codec);
assert ch.finish();
assert Integer.compare(1, (Integer) ch.readInbound()) == 0;
}