EventLoopGroup
EventLoopGroup 如名是EventLoop的Group 用于管理EventLoop。EventLoopGroup可以理解为一个线程池,其中的线程(EventLoop)负责处理事件和任务。
selectable
// single thread pool
NioEventLoopGroup group = new NioEventLoopGroup(1);
// loop one
NioEventLoop loop = (NioEventLoop) group.next();
try {
Channel channel = new NioServerSocketChannel();
loop.register(channel).syncUninterruptibly();
channel.bind(new InetSocketAddress(0)).syncUninterruptibly();
// config selector
SocketChannel selectableChannel = SocketChannel.open();
selectableChannel.configureBlocking(false);
selectableChannel.connect(channel.localAddress());
final CountDownLatch latch = new CountDownLatch(1);
// register key
loop.register(selectableChannel, SelectionKey.OP_CONNECT, new NioTask<SocketChannel>() {
@Override
public void channelReady(SocketChannel ch, SelectionKey key) {
latch.countDown();
}
@Override
public void channelUnregistered(SocketChannel ch, Throwable cause) {
}
});
latch.await();
selectableChannel.close();
channel.close().syncUninterruptibly();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully();
}
}
full param group and new task queue
final AtomicBoolean called = new AtomicBoolean();
NioEventLoopGroup group = new NioEventLoopGroup(
1, // 线程数
new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)), // Executor
DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(), // selector
DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(), // 拒绝策略
new EventLoopTaskQueueFactory() { // task queue factory
@Override
public Queue<Runnable> newTaskQueue(int maxCapacity) {
called.set(true);
return new LinkedBlockingQueue<Runnable>(maxCapacity);
}
});
final NioEventLoop loop = (NioEventLoop) group.next();
try {
loop.submit(new Runnable() {
@Override
public void run() {
// do some
}
}).syncUninterruptibly();
assert true == called.get();
} finally {
group.shutdownGracefully();
}
delay await cancel
EventLoopGroup group = new NioEventLoopGroup(1);
final EventLoop el = group.next();
// schedule
Future<?> future = el.schedule(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
// await
assert false == future.awaitUninterruptibly(100);
// cancel
assert true == future.cancel(true);
group.shutdownGracefully();