Java教程

Netty网络框架学习笔记-18(NioEventLoop源码与处理器异步任务分析_2020.06.25)

本文主要是介绍Netty网络框架学习笔记-18(NioEventLoop源码与处理器异步任务分析_2020.06.25),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言:

编写netty网络服务器的时候, 第一行代码, 就是创建线程组 NioEventLoopGroup bossGroup = new NioEventLoopGroup()

下面就来分析下, 其中一个 NioEventLoop

NioEventLoop关系

jk0pKx.png

  • 说明
  1. ScheduledExecutorService 接口表示是一个定时任务接口,EventLoop 可以接受定时任务。
  2. EventLoop 接口:Netty 接口文档说明该接口作用:一旦 Channel 注册了,就处理该 Channel 对应的所有 I/O 操作。
  3. SingleThreadEventExecutor 表示这是一个单个线程的线程池
  4. EventLoop 是一个单例的线程池,里面含有一个死循环的线程不断的做着 3 件事情:监听端口,处理端口事件,处理队列事件。每个 EventLoop 都可以绑定多个 Channel,而每个 Channel 始终只能由一个 EventLoop 来处理

NioEventLoop的创建

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

super => 创建 SingleThreadEventLoop 单线程

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
     this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
 }

NioEventLoop-execute方法

NioEventLoop本身并没有实现该方法, 所以调用的是父类 SingleThreadEventExecutor # execute

private void execute(Runnable task, boolean immediate) {
    // 判断当前线程是不是 EventLoop 线程
        boolean inEventLoop = inEventLoop();
    // 添加到任务队列里面
        addTask(task);
    // 如果不是当前线程, 则新启动一个线程
        if (!inEventLoop) {
            startThread();
            // 如果线程已经停止,并且删除任务成功,则执行拒绝策略,默认是抛出RejectedExecutionException异常。
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
		// 如果 addTaskWakesUp 是 false,并且任务不是 NonWakeupRunnable 类型的,
    	// 就尝试唤醒 selector。这 个时候,阻塞在 selecor 的线程就会立即返回
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

doStartThread

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 执行run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
  1. 首先调用 executor 的 execute 方法,这个 executor 就是在创建 Event LoopGroup 的时候创建的 ThreadPerTaskExecutor 类。该 execute 方法会将 Runnable 包装成 Netty 的 FastThreadLocalThread。
  2. 任务中,首先判断线程中断状态,然后设置最后一次的执行时间。
  3. 执行当前 NioEventLoop 的 run 方法,注意:这个方法是个死循环,是整个 EventLoop 的核心
  4. 在 finally 块中,使用 CAS 不断修改 state 状态,改成 ST_SHUTTING_DOWN。也就是当线程 Loop 结束的时候。关闭线程。最后还要死循环确认是否关闭,否则不会 break。然后,执行 cleanup 操作,更新状态为ST_TERMINATED,并释放当前线程锁。如果任务队列不是空,则打印队列中还有多少个未完成的任务。 并回调 terminationFuture 方法。
  5. 其实最核心的就是 Event Loop 自身的 run 方法。再继续深入 run 方法

jky9EV.png

EventLoop 作为 Netty 的核心的运行机制总结

  1. 每次执行 execute 方法都是向队列中添加任务。当第一次添加时就启动线程,执行 run 方法,而 run 方法是整个 EventLoop 的核心,就像 EventLoop 的名字一样,Loop Loop ,不停的 Loop ,Loop 做什么呢?做 3 件 事情。
  • 调用 selector 的 select 方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上 0.5 秒进行阻塞。当执行 execute 方法的时候,也就是添加任务的时候,唤醒 selecor,防止 selecotr 阻塞时间过长。

  • 当 selector 返回的时候,回调用 processSelectedKeys 方法对 selectKey 进行处理。

  • 当 processSelectedKeys 方法执行结束后,则按照 ioRatio 的比例执行 runAllTasks 方法,默认是 IO 任务时间和非 IO 任务时间是相同的,你也可以根据你的应用特点进行调优 。比如 非 IO 任务比较多,那么你就将ioRatio调小一点,这样非 IO 任务就能执行的长一点。防止队列积攒过多的任务。

handler中加入线程池和Context 中添加线程池分析

在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。

而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有 2 种方式,而且这 2 种方式实现的区别也蛮大的。

第一种handler中加入线程池:

服务端与客户端启动都是一样的, 处理器中内容不一样, 这里只显示不同处

假设在自定义服务端处理器中这样写:

将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 IO 线程。

private static final EventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(16);
private static final EventExecutor unorderedThreadPoolEventExecutor = new UnorderedThreadPoolEventExecutor(16);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("NettyGroupServiceHandle-读取到信息:{}", msg);
        log.info("NettyGroupServiceHandle-读取到多少次信息:{}", ++count);
        defaultEventExecutorGroup.execute(() -> {
            String name = Thread.currentThread().getName();
            log.info("NettyGroupServiceHandle===线程名称:{}, 进行一个耗时10秒的任务",name);
            ThreadUtil.sleep(10, TimeUnit.SECONDS);
            log.info("NettyGroupServiceHandle===线程名称:{}, 一个耗时10秒的任务完成",name);
            ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s",name,System.currentTimeMillis()));
        });

        unorderedThreadPoolEventExecutor.execute(() -> {
            String name = Thread.currentThread().getName();
            log.info("NettyGroupServiceHandle===线程名称:{}, 进行一个耗时10秒的任务", name);
            ThreadUtil.sleep(10, TimeUnit.SECONDS);
            log.info("NettyGroupServiceHandle===线程名称:{}, 一个耗时10秒的任务完成",name);
            ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s",name,System.currentTimeMillis()));
        });
    }

客户端处理器

@Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("NettyGroupClientHandle-读取到信息:{}", msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 通道激活, 模拟3个几乎同时发送消息的请求
        EventExecutor executor = ctx.executor();
        executor.schedule(()-> sendUUID(ctx),1000, TimeUnit.MILLISECONDS);
        executor.schedule(()-> sendUUID(ctx),900, TimeUnit.MILLISECONDS);
        executor.schedule(()-> sendUUID(ctx),800, TimeUnit.MILLISECONDS);
    }
    
    private void sendUUID(ChannelHandlerContext ctx) {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(UUID.fastUUID() +"||");
        }
    }

结果:

## 服务端日志
15:12:55.639 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:172f832a-f7a0-42ae-9dc0-54f866e0cbc8
15:12:55.639 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:1
15:12:55.665 [defaultEventExecutorGroup-4-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-1, 进行一个耗时10秒的任务
15:12:55.667 [unorderedThreadPoolEventExecutor-5-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-1, 进行一个耗时10秒的任务
15:12:55.704 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:b9f07858-92c6-47d7-b1b4-0b2835398641
15:12:55.704 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:2
15:12:55.704 [defaultEventExecutorGroup-4-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-2, 进行一个耗时10秒的任务
15:12:55.704 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-2, 进行一个耗时10秒的任务
15:12:55.814 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到信息:1a16bfdb-fc0d-47a3-a7e3-525db2b93606
15:12:55.814 [nioEventLoopGroup-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle-读取到多少次信息:3
15:12:55.814 [defaultEventExecutorGroup-4-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-3, 进行一个耗时10秒的任务
15:12:55.814 [unorderedThreadPoolEventExecutor-5-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-3, 进行一个耗时10秒的任务
### --------------------------------------------------------------------------------------
15:13:05.670 [unorderedThreadPoolEventExecutor-5-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-1, 一个耗时10秒的任务完成
15:13:05.670 [defaultEventExecutorGroup-4-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-1, 一个耗时10秒的任务完成
15:13:05.716 [defaultEventExecutorGroup-4-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-2, 一个耗时10秒的任务完成
15:13:05.716 [unorderedThreadPoolEventExecutor-5-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-2, 一个耗时10秒的任务完成
15:13:05.824 [defaultEventExecutorGroup-4-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:defaultEventExecutorGroup-4-3, 一个耗时10秒的任务完成
15:13:05.824 [unorderedThreadPoolEventExecutor-5-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandle - NettyGroupServiceHandle===线程名称:unorderedThreadPoolEventExecutor-5-3, 一个耗时10秒的任务完成
### 客户端日志
15:13:05.676 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-5-1, 10秒任务完成了!!!, 时间戳: 1656400385670线程defaultEventExecutorGroup-4-1, 10秒任务完成了!!!, 时间戳: 1656400385670
15:13:05.717 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-4-2, 10秒任务完成了!!!, 时间戳: 1656400385716线程unorderedThreadPoolEventExecutor-5-2, 10秒任务完成了!!!, 时间戳: 1656400385716
15:13:05.825 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-5-3, 10秒任务完成了!!!, 时间戳: 1656400385824线程defaultEventExecutorGroup-4-3, 10秒任务完成了!!!, 时间戳: 1656400385824

在 channelRead 方法,模拟了一个耗时 10 秒的操作,我们将这个任务提交到了一个自定义的业务线程池中,这样,就不会阻塞 Netty 的 处理请求的 IO 线程。

当IO线程轮询到一个socket事件,然后,IO线程开始处理,当走到耗时handler的时候,将耗时任务交给业务线程池。

当耗时任务执行完毕再执行ctx.writeAndFlush方法的时候,会将这个任务交给处理 IO 线程。

jeVlqI.png

第二种添加处理器时添加线程池

private static final EventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(16);
private static final EventExecutor unorderedThreadPoolEventExecutor = new UnorderedThreadPoolEventExecutor(16);

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap = bootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .handler(new LoggingHandler(LogLevel.INFO)) // boosGroup服务器处理程序, 日志
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 添加框架提供的字符串编解码处理器
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        // 添加自己的处理器
                        pipeline.addLast(defaultEventExecutorGroup,"one",new NettyGroupServiceHandleTwo());
                        //pipeline.addLast(unorderedThreadPoolEventExecutor,"two",new NettyGroupServiceHandleTwo());
                    }
                });

        try {
            ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress("127.0.0.1", 8888)).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

服务端处理器

public class NettyGroupServiceHandleTwo extends SimpleChannelInboundHandler<String> {

    private static int count = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        String name = Thread.currentThread().getName();
        log.info("NettyGroupServiceHandleTwo===线程名称:{}, 进行一个耗时10秒的任务", name);
        ThreadUtil.sleep(10, TimeUnit.SECONDS);
        log.info("NettyGroupServiceHandleTwo===线程名称:{}, 一个耗时10秒的任务完成", name);
        ctx.writeAndFlush(String.format("线程%s, 10秒任务完成了!!!, 时间戳: %s", name, System.currentTimeMillis()));
        // 传递给下一个处理器
        ctx.fireChannelRead(msg);
    }
}

使用DefaultEventExecutorGroup 结果:

15:52:52.149 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:02.161 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成

15:53:02.163 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:12.176 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成

15:53:12.176 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 进行一个耗时10秒的任务
15:53:22.186 [defaultEventExecutorGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:defaultEventExecutorGroup-2-1, 一个耗时10秒的任务完成
### 客户端日志
15:53:02.167 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402782161
15:53:12.176 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402792176
15:53:22.186 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程defaultEventExecutorGroup-2-1, 10秒任务完成了!!!, 时间戳: 1656402802186

从结果看出, 客户端是同一个的情况下, 无论是同时并发写的情况下, 都是使用线程池中的一个线程进行处理, 并且需等上一个通道中的线程执行完毕, 在处理下一个, 这就很尴尬了!

使用pipeline.addLast(unorderedThreadPoolEventExecutor,"two",new NettyGroupServiceHandleTwo()); 结果:

15:56:35.712 [unorderedThreadPoolEventExecutor-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-1, 进行一个耗时10秒的任务
15:56:35.776 [unorderedThreadPoolEventExecutor-3-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-3, 进行一个耗时10秒的任务
15:56:35.886 [unorderedThreadPoolEventExecutor-3-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-2, 进行一个耗时10秒的任务

15:56:45.722 [unorderedThreadPoolEventExecutor-3-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-1, 一个耗时10秒的任务完成
15:56:45.785 [unorderedThreadPoolEventExecutor-3-3] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-3, 一个耗时10秒的任务完成
15:56:45.896 [unorderedThreadPoolEventExecutor-3-2] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupServiceHandleTwo - NettyGroupServiceHandleTwo===线程名称:unorderedThreadPoolEventExecutor-3-2, 一个耗时10秒的任务完成
### 客户端日志
15:56:45.728 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-1, 10秒任务完成了!!!, 时间戳: 1656403005723
15:56:45.786 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-3, 10秒任务完成了!!!, 时间戳: 1656403005785
15:56:45.897 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.tcp.eventExecutorGroup.NettyGroupClientHandle - NettyGroupClientHandle-读取到信息:线程unorderedThreadPoolEventExecutor-3-2, 10秒任务完成了!!!, 时间戳: 1656403005896

从结果可以看出来, 在客户端多次调用写的时候, 服务端并不会把 线程池中的某一个线程和管道绑定, 而是每次客户端新写的时候, 都会使用一个不同的线程来处理, 由于使用的是不同的线程, 根据不存在需要等待上一个线程处理完毕在处理下一个, 大大提高了系统的并发能力。

然后就是, ctx.writeAndFlush() 方法, 和上面debug的结果是一样的,

不同的是读取消息的时候, 会不同下面可以debug看看

jeuevF.png

当我们在调用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用处理请求的 IO线程池

当走到AbstractChannelHandlerContext的invokeChannelRead方法的时候,executor.inEventLoop()是不会通过的,因为当前线程是IO线程context(t也就是Handler)的executor是业务线程,所以会异步执行

两种方式比较

  • 第一种

handler中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需要,就不异步,异步会拖长接口响应时间。因为需要将任务放进mpscTask中。如果IO时间很短,task很多,可能一个循环下来,都没时间执行整个task,导致响应时间达不到指标。

  • 第二种

是Netty标准方式(即加入到队列),但是,这么做会将整个handler都交给业务线程池。不论耗时不耗时,都加入到队列里,不够灵活。

最终根据业务方案自定义选择。

  • 问题: 为什么不使用JDK提供的线程池来做额外耗时的业务处理?

因为netty设计线程组的目的就是去掉JDK锁竞争, 无锁化运行

netty中文档也建议使用对应提供的线程组

jZxfsS.png

1

这篇关于Netty网络框架学习笔记-18(NioEventLoop源码与处理器异步任务分析_2020.06.25)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!