Channel
Channel是网络传输中的实体,它代表了一个开放的连接,可以进行数据的读写操作。
channel init
final String A = "a";
final String B = "B";
// inbound handler
ChannelHandler handler = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(A);
ctx.fireChannelRead(B);
}
};
// channel
EmbeddedChannel channel = new EmbeddedChannel(
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(handler);
}
}
);
assert handler == channel.pipeline().firstContext().handler();
assert true == channel.writeInbound("C");
assert true == channel.finish();
assert A.equals(channel.readInbound());
assert B.equals(channel.readInbound());
Scheduling
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
final CountDownLatch latch = new CountDownLatch(2);
// future
Future future = ch.eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println("future");
latch.countDown();
}
}, 1, TimeUnit.SECONDS);
// 监听future完成
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("operationComplete");
latch.countDown();
}
});
// run pending schedule task
long next = ch.runScheduledPendingTasks();
System.out.println(next);
assert next > 0;
TimeUnit.NANOSECONDS.sleep(TimeUnit.NANOSECONDS.toMillis(next) + 66);
assert ch.runScheduledPendingTasks() == -1;
System.out.println(next);
latch.await();