米鼠商城

多块好省,买软件就上米鼠网

最新项目

  • 加油站系统开发

    预算:$150,000.00

    类别:软件开发>企业软件

    15355人关注
  • 筹金结算系统

    预算:$35,000.00

    类别:软件开发>其他软件开发

    15522人关注
  • 物业项目开发

    预算:$40,000.00

    类别:移动应用>其他移动应用

    17054人关注
  • CRM系统

    预算:$60,000.00

    类别:网站建设>网站开发

    19154人关注
  • 微信公众号账号开发

    预算:$18,000.00

    类别:移动应用>其他移动应用

    19371人关注
  • 网站商城开发

    预算:$50,000.00

    类别:网站建设>网站开发

    18363人关注

人才服务

靠谱的IT人才垂直招聘平台

Netty源码学习笔记之boss线程处理流程

  • darkalex
  • 5
  • 2019-06-12 15:59

server在启动的时候会开启两个线程:bossGroup和workerGroup,这两个线程分别是boss线程池(用于接收client请求)和worker线程池(用于处理具体的读写操作),这两个线程调度器都是NioEventLoopGroup,bossGroup有一个NioEventLoop,而worker线程池有n*cup数量个NioEventLoop。那么我们看看在NioEventLoop中的是如何开始的:

  NioEventLoop本质上是一个线程调度器(继承自ScheduledExecutorService),当bind之后就开始run起一个线程:  

(代码一)
 1     @Override
 2     protected void run() {
 3         for (;;) {
 4             boolean oldWakenUp = wakenUp.getAndSet(false);
 5             try {
 6                 if (hasTasks()) {
 7                     selectNow();
 8                 } else {
 9                     select(oldWakenUp);
10 
11                     if (wakenUp.get()) {
12                         selector.wakeup();
13                     }
14                 }
15 
16                 cancelledKeys = 0;
17                 needsToSelectAgain = false;
18                 final int ioRatio = this.ioRatio;
19                 if (ioRatio == 100) {
20                     processSelectedKeys();
21                     runAllTasks();
22                 } else {
23                     final long ioStartTime = System.nanoTime();
24 
25                     processSelectedKeys();
26 
27                     final long ioTime = System.nanoTime() - ioStartTime;
28                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
29                 }
30 
31                 if (isShuttingDown()) {
32                     closeAll();
33                     if (confirmShutdown()) {
34                         break;
35                     }
36                 }
37             } catch (Throwable t) {
38                 ...
39             }
40         }
41     }

 

  这个for(;;)里面就是boss线程的核心处理流程:

  【代码一主线】1,不断地监听selector拿到socket句柄然后创建channel。每次run的时候先拿到wakeup的值,并且set进去false(PS:wakeup是什么鬼?一个AtomicBoolean,代表是否用户唤醒,如果不人为将其set成true,永远是false)。

    【代码一主线】2,如果任务队列中已有任务,那么selectNow(),(PS:selectNow是什么鬼?我们知道selector.select()是一个阻塞调用,而selectNow方法是个非阻塞方法,如果没有到达的socket句柄则返回0),因此若队列中已有任务的话应该立即开始执行,而不能阻塞到selector.select()上,否则则调用select()方法,继续看select()里面:

(代码二)
 1     private void select(boolean oldWakenUp) throws IOException {
 2         Selector selector = this.selector;
 3         try {
 4             int selectCnt = 0;
 5             long currentTimeNanos = System.nanoTime();
 6             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
 7             for (;;) {
 8                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
 9                 if (timeoutMillis <= 0) {
10                     if (selectCnt == 0) {
11                         selector.selectNow();
12                         selectCnt = 1;
13                     }
14                     break;
15                 }
16 
17                 int selectedKeys = selector.select(timeoutMillis);
18                 selectCnt ++;
19 
20                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
21                     // 如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break
22                     break;
23                 }
24                 if (Thread.interrupted()) {
25                     //如果线程被中断则重置selectedKeys,同时break出本次循环,所以不会陷入一个繁忙的循环。
26                     selectCnt = 1;
27                     break;
28                 }
29 
30                 long time = System.nanoTime();
31                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
32                     // selector超时
33                     selectCnt = 1;
34                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
35                     // selector多次过早返回,重新建立并打开Selector
36                     ...
37                 }
38 
39                 currentTimeNanos = time;
40             }
41             ...
42         } catch (CancelledKeyException e) {
43             ...
44         }
45     }

   我们看到,select()方法进入一个for循环去select阻塞等待socket(这里的selector的实现在是根据操作系统和netty的版本来定的,在最新的netty中是使用的linux的epoll模型),同时入参里有“超时时间”,如果超过了这个时间仍然没有socket到来则重新将selectCnt置为1重新循环等待,直到有socket到来。如果selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,那么就是说该eventLoop有活干了,先break出去去干活,完了再打开selector重新阻塞等待。正常情况下会等待到一个socket,break出去之后回到代码一

  【代码一主线】3,根据ioRatio来选择任务执行策略(PS:ioRatio是什么鬼?看了下用途应该是这样的,这个ioRatio代表该eventLoop期望在I/O操作上花费时间的比例)。而NioEventLoop中有两类操作,一类是I/O操作(读写之类),调用processSelectedKeys;一类是非I/O操作(例如register等),调用runAllTasks。如果ioRatio是100的话那么会按照顺序执行I/O操作->非I/O操作;如果不是会按照这个比例算出一个超时时间,在run任务队列的时候如果超过了这个时间会立即返回,确保I/O操作可以得到及时的调用。

  我们关心的是I/O操作,那么进入processSelectedKeys()看下发生了什么吧。

  

(代码三)
1     private void processSelectedKeys() {
2         if (selectedKeys != null) {
3             processSelectedKeysOptimized(selectedKeys.flip());
4         } else {
5             processSelectedKeysPlain(selector.selectedKeys());
6         }
7     }

 

  正常情况下会走到processSelectedKeysOptimized中:

  

(代码四)
 1   private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
 2         for (int i = 0;; i ++) {
 3             final SelectionKey k = selectedKeys[i];
 4             if (k == null) {
 5                 break;
 6             }
 7             selectedKeys[i] = null;
 8 
 9             final Object a = k.attachment();
10 
11             if (a instanceof AbstractNioChannel) {
12                 processSelectedKey(k, (AbstractNioChannel) a);
13             } else {
14                 @SuppressWarnings("unchecked")
15                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
16                 processSelectedKey(k, task);
17             }
18 
19             if (needsToSelectAgain) {
20                 for (;;) {
21                     if (selectedKeys[i] == null) {
22                         break;
23                     }
24                     selectedKeys[i] = null;
25                     i++;
26                 }
27 
28                 selectAgain();
29                 selectedKeys = this.selectedKeys.flip();
30                 i = -1;
31             }
32         }
33     }

  

  遍历拿到所有的SelectionKey,然后判断每个SelectionKey的attachment,上篇文章中已经分析过给ServerBootstrap注册的Channel是NioServerSocketChannel(继承自AbstractNioChannel),因此进入processSelectedKey中:

 

(代码五)
 1   private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 2         final NioUnsafe unsafe = ch.unsafe();
 3         if (!k.isValid()) {
 4             unsafe.close(unsafe.voidPromise());
 5             return;
 6         }
 7 
 8         try {
 9             int readyOps = k.readyOps();
10             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
11                 unsafe.read();
12                 if (!ch.isOpen()) {
13                     return;
14                 }
15             }
16             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
17                 ch.unsafe().forceFlush();
18             }
19             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
20                 int ops = k.interestOps();
21                 ops &= ~SelectionKey.OP_CONNECT;
22                 k.interestOps(ops);
23 
24                 unsafe.finishConnect();
25             }
26         } catch (CancelledKeyException ignored) {
27             unsafe.close(unsafe.voidPromise());
28         }
29     }

  

  在这里根据传入的SelectionKey的已就绪操作类型来决定下一步的操作,如果是一个读操作,那么进入AbstractNioMessageChannel$NioMessageUnsafe的read实现,这里代码很多,我们只贴一下核心的代码:

 

(代码六)
 1         @Override
 2         public void read() {
 3             ...
 4             final ChannelPipeline pipeline = pipeline();
 5             ...
 6             try {
 7                 int size = readBuf.size();
 8                 for (int i = 0; i < size; i ++) {
 9                     pipeline.fireChannelRead(readBuf.get(i));
10                 }
11                 ...
12                 readBuf.clear();
13                 pipeline.fireChannelReadComplete();
14             } finally {
15             }
16         }

  核心就是这个pipeline.fireChannelRead(readBuf.get(i));,这已经到了pipeline阶段,可能有些人会误以为这是不是已经到了worker线程中,但是不可能啊,我们的代码其实在处于processSelectedKeys的逻辑里面。实际上,不论是boss还是worker,他们都是NioEventLoopGroup,玩法都是一样的,只不过职责不一样而已。boss也有自己的handler,上篇文章中我们提到了netty中的reactor模式的玩法,从Doug Lea的图中可以看出,boss(实际上就是mainReactor)的handler其实就是这个acceptor。

  在此我们顺便学习一下netty中的handler:

  

  从用途上来说,handler分为ChannelInboundHandler(读)和ChannelOutboundHandler(写),增加一层适配器产生了两handler的Adapter,我们使用到的类都是继承自这两个Adapter。我们经常用到的SimpleChannelInboundHandler就继承ChannelInboundHandlerAdapter,用于初始化用户handler链的ChannelInitializer和boss线程绑定的ServerBootstrapAcceptor也都继承于此。

  回到【代码六主线】我们从pipeline.fireChannelRead继续追踪下去会追到ChannelInboundHandler的channelRead的实现,而这里的Hander就是ServerBootstrapAcceptor。

(代码七)
 1         @Override
 2         @SuppressWarnings("unchecked")
 3         public void channelRead(ChannelHandlerContext ctx, Object msg) {
 4             final Channel child = (Channel) msg;
 5 
 6             child.pipeline().addLast(childHandler);
 7 
 8             for (Entry<ChannelOption<?>, Object> e: childOptions) {
 9                 try {
10                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
11                     }
12                 } catch (Throwable t) {
13                 }
14             }
15 
16             for (Entry<AttributeKey<?>, Object> e: childAttrs) {
17                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
18             }
19 
20             try {
21                 childGroup.register(child).addListener(new ChannelFutureListener() {
22                     @Override
23                     public void operationComplete(ChannelFuture future) throws Exception {
24                         if (!future.isSuccess()) {
25                             forceClose(child, future.cause());
26                         }
27                     }
28                 });
29             } catch (Throwable t) {
30                 forceClose(child, t);
31             }
32         }

  由于ServerBootstrapAcceptor 很重要,我们先看一下都有什么内容:

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;
}

  我自己的理解:

  childGroup就是subReactor(也就是worker线程);childHandler就是xxx;childOptions和childAttrs是为channel准备的一些参数。

  回到【代码七主线】在这里做了3件事:

  1.为客户端channel的pipeline中添加childHandler,那么这个childHandler是什么鬼呢?回忆一下上文中的服务端启动代码,有bootStrap.childHandler(xxx)这样的代码,所以此处就是把在服务端启动时我们定义好的Handler链绑定给每个channel。

  2.把我们服务端初始化时的参数绑定到每个channel中。

  3.childGroup.register(child).addListener(new ChannelFutureListener()),后面这个异步listener作用很明确,问题是这个childGroup是什么鬼?我理解应该就是worker线程了。详细说一下childGroup.register(child),继续跟下去,跟到AbstractChannel$AbstractUnsafe中

(代码八)
 1         @Override
 2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3             ...
 4             AbstractChannel.this.eventLoop = eventLoop;
 5 
 6             if (eventLoop.inEventLoop()) {
 7                 register0(promise);
 8             } else {
 9                 ...
10                 } catch (Throwable t) {
11                 }
12             }
13         }

  继续register0:

(代码九)
 1     private void register0(ChannelPromise promise) {
 2             try {
 3                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
 4                     return;
 5                 }
 6                 boolean firstRegistration = neverRegistered;
 7                 doRegister();
 8                 neverRegistered = false;
 9                 registered = true;
10                 safeSetSuccess(promise);
11                 pipeline.fireChannelRegistered();
12                 if (firstRegistration && isActive()) {
13                     pipeline.fireChannelActive();
14                 }
15             } catch (Throwable t) {
16             }
17         }

  这里核心有两步:

  1.doRegister(),其实我们在上篇文章中分析过,就是将channel绑定到selector上。此处有点懵逼,我猜测是绑定到worker线程的selector中,如果有大神知道请留言我的微博。

  2.pipeline.fireChannelRegistered(),继续往下跟跟进到ChannelInboundHandler的channelRegistered方法中,而此时会调用我们定义的ChannelInitializer,将我们定义的handler注册到pipeline中。

 

  至此【代码一主线】执行完毕,我们浏览了一遍boss线程的在接收socket请求期间的处理流程,过程中是结合reactor模式去理解的,有些地方自己也有点不懂,还请各位指正。

  总结一下:

  1.boss线程就是个loop循环,打开selector -> 获得监听到的SelectionKey -> 处理I/O请求 -> 处理非I/O请求,而我们最关心的就是处理I/O请求(在processSelectedKeys()方法中完成)。

  2.遍历准备就绪的SelectionKey,根据其可操作类型(read or write。。)来决定下一步的具体操作,我们着重去了解了read逻辑。

  3.NioServerSocketChannel调用父类AbstractNioMessageChannel的unsafe类NioMessageUnsafe来处理读取逻辑:调用pipeline处理readbuf。

  4.pipeline.fireChannelRead会调用ServerBootstrapAcceptor的channelRead:初始化客户端channel参数,将该channel绑定到worker线程的selector中,为channel注册用户定义的handler链。

 

  再精炼一点:

  boss线程只是接收客户端socket并初始化客户端channle,将channel丢给acceptor,acceptor会将这个channel注册到worker线程中。整个loop过程都是一个非阻塞过程(全部异步化),同时boss中不会做耗时的I/O读取,只是将channel丢给worker。因此是一个高效的loop过程。



这里给大家推荐一个在线软件复杂项交易平台:米鼠网 https://www.misuland.com

米鼠网自成立以来一直专注于从事软件项目人才招聘软件商城等,始终秉承“专业的服务,易用的产品”的经营理念,以“提供高品质的服务、满足客户的需求、携手共创双赢”为企业目标,为中国境内企业提供国际化、专业化、个性化、的软件项目解决方案,我司拥有一流的项目经理团队,具备过硬的软件项目设计和实施能力,为全国不同行业客户提供优质的产品和服务,得到了客户的广泛赞誉。

猜你喜欢

评论留言