ByteToMessage,MessageToByte,MessageToMessage

串行的数据流的编码解码

ByteToMessage


    public static void main(String[] args) {
        testM2M();
    }
    private static void testBase(){
        // xxxx\r\nxxx\r\n
        ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
            @Override
            protected void decode(
                ChannelHandlerContext channelHandlerContext, // 上下文 
                ByteBuf byteBuf, // 数据buffer
                List<Object> list // 传给下一层的对象
                ) throws Exception {
                while (byteBuf.isReadable()) {
                    int crlfIndex = byteBuf.forEachByte(ByteProcessor.FIND_CRLF);
                    if (crlfIndex == -1) {
                        break;
                    }
                    byte[] bytes = new byte[crlfIndex - byteBuf.readerIndex()];
                    byteBuf.readBytes(bytes,0,crlfIndex - byteBuf.readerIndex());
                    byteBuf.readerIndex(crlfIndex);
                    byteBuf.readShort();
                    list.add(new String(bytes));
                }
            }
        };

MessageToMessage

        ByteToMessageDecoder decoder = new ByteToMessageDecoder() {
            @Override
            protected void decode(
                    ChannelHandlerContext channelHandlerContext, // 上下文
                    ByteBuf byteBuf, // 数据buffer
                    List<Object> list // 传给下一层的对象
            ) throws Exception {
                while (byteBuf.isReadable()) {
                    int crlfIndex = byteBuf.forEachByte(ByteProcessor.FIND_CRLF);
                    if (crlfIndex == -1) {
                        break;
                    }
                    byte[] bytes = new byte[crlfIndex - byteBuf.readerIndex()];
                    byteBuf.readBytes(bytes,0,crlfIndex - byteBuf.readerIndex());
                    byteBuf.readerIndex(crlfIndex);
                    byteBuf.readShort();
                    list.add(new String(bytes));
                }
            }
        };

        MessageToMessageDecoder decoderM2M = new MessageToMessageDecoder() {
            @Override
            protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
                if (o instanceof String) {
                    list.add(String.format("[%s]",o));
                }
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(decoder);
        channel.pipeline().addLast(decoderM2M);

        channel.writeInbound(Unpooled.wrappedBuffer("hello\r\nworld\r\n".getBytes(StandardCharsets.UTF_8)));
        assert "[hello]" == (String) channel.readInbound();
        assert "[world]" == (String) channel.readInbound();

Replaying

        // replaying 可以用在一个Message再传输时不在同一时间到达的情况
        ReplayingDecoder<Void> replayingDecoder = new ReplayingDecoder<Void>() {
            @Override
            protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
                ByteBuf msg = in.readBytes(in.bytesBefore((byte) '\n'));
                out.add(msg);
                in.skipBytes(1);
            }
        };
        EmbeddedChannel ch = new EmbeddedChannel(replayingDecoder);

        ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'A'}));
        assert Objects.isNull(ch.readInbound());
        ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'B'}));
        assert Objects.isNull(ch.readInbound());
        ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'C'}));
        assert Objects.isNull(ch.readInbound());
        // 直到 \n 传到才完成一个Message的decode
        ch.writeInbound(Unpooled.wrappedBuffer(new byte[]{'\n'}));

        ByteBuf buf = Unpooled.wrappedBuffer(new byte[] { 'A', 'B', 'C' });
        ByteBuf buf2 = ch.readInbound();

        assert buf.toString(StandardCharsets.UTF_8).equals(buf2.toString(StandardCharsets.UTF_8));

        buf2.release();
        buf.release();

        ch.finish();

Codec

        // codec 同时有decode和encode
        ByteToMessageCodec<Integer> codec = new ByteToMessageCodec<Integer>() {
            @Override
            protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
                out.writeInt(msg);
            }

            @Override
            protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
                if (in.readableBytes() >= 4) {
                    out.add(in.readInt());
                }
            }
        };
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeInt(1);
        buffer.writeByte('0');

        EmbeddedChannel ch = new EmbeddedChannel(codec);
        assert ch.writeInbound(buffer);
        ch.pipeline().remove(codec);
        assert ch.finish();
        assert Integer.compare(1, (Integer) ch.readInbound()) == 0;

    }