EventLoopGroup

EventLoopGroup 如名是EventLoop的Group 用于管理EventLoop。EventLoopGroup可以理解为一个线程池,其中的线程(EventLoop)负责处理事件和任务。

selectable

        // single thread pool
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        // loop one
        NioEventLoop loop = (NioEventLoop) group.next();
        try {

            Channel channel = new NioServerSocketChannel();
            loop.register(channel).syncUninterruptibly();
            channel.bind(new InetSocketAddress(0)).syncUninterruptibly();
            // config selector
            SocketChannel selectableChannel = SocketChannel.open();
            selectableChannel.configureBlocking(false);
            selectableChannel.connect(channel.localAddress());

            final CountDownLatch latch = new CountDownLatch(1);
            // register key
            loop.register(selectableChannel, SelectionKey.OP_CONNECT, new NioTask<SocketChannel>() {
                @Override
                public void channelReady(SocketChannel ch, SelectionKey key) {
                    latch.countDown();
                }

                @Override
                public void channelUnregistered(SocketChannel ch, Throwable cause) {
                }
            });

            latch.await();

            selectableChannel.close();
            channel.close().syncUninterruptibly();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            group.shutdownGracefully();
        }
    }

full param group and new task queue

        final AtomicBoolean called = new AtomicBoolean();

        NioEventLoopGroup group = new NioEventLoopGroup(
                1, // 线程数
                new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)), // Executor
                DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(), // selector
                DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(), // 拒绝策略
                new EventLoopTaskQueueFactory() { // task queue factory
                    @Override
                    public Queue<Runnable> newTaskQueue(int maxCapacity) {
                        called.set(true);
                        return new LinkedBlockingQueue<Runnable>(maxCapacity);
                    }
                });

        final NioEventLoop loop = (NioEventLoop) group.next();

        try {
            loop.submit(new Runnable() {
                @Override
                public void run() {
                    // do some
                }
            }).syncUninterruptibly();
            assert true == called.get();
        } finally {
            group.shutdownGracefully();
        }

delay await cancel

        EventLoopGroup group = new NioEventLoopGroup(1);

        final EventLoop el = group.next();
        // schedule
        Future<?> future = el.schedule(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        
        // await
        assert false == future.awaitUninterruptibly(100);
        // cancel
        assert true == future.cancel(true);

        group.shutdownGracefully();