通过Tomcat的Http11NioProtocol源码学习Java NIO设计

Tomcat的Http11NioProtocol协议使用Java NIO技术实现高性能Web服务器。本文通过分析Http11NioProtocol源码来学习Java NIO的使用。从中可以了解到阻塞IO和非阻塞IO的配合,NIO的读写操作以及Selector.wakeup的使用。

1. 初始化阶段

Java NIO服务器端实现的第一步是开启一个新的ServerSocketChannel对象。Http11NioProtocol的实现也不例外, 在NioEndPoint类的init方法可以看到这段代码。

代码1:NioEndPoint.init()方法

public void init()
    throws Exception {

    if (initialized )
        return;
    //开启一个新的ServerSocketChannel
    serverSock = ServerSocketChannel.open();
    //设置socket的性能偏好
    serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(),
                                                  socketProperties.getPerformanceLatency(),
                                                  socketProperties.getPerformanceBandwidth());
    InetSocketAddress addr = ( address!=null ?new InetSocketAddress(address ,port ):new InetSocketAddress(port));
    //绑定端口号,并设置backlog
    serverSock.socket().bind(addr,backlog );
    //将serverSock设置成阻塞IO
    serverSock.configureBlocking(true); //mimic APR behavior

    //初始化acceptor线程数
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    //初始化poller线程数
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }

    // 根据需要,初始化SSL
    // 因为主要关注Java NIO, 所以这一块代码就省略掉了
    if (isSSLEnabled()) {
       ......
    }
    //OutOfMemoryError策略
    if (oomParachute >0) reclaimParachute(true);

    //开启NioSelectorPool
    selectorPool.open();
    initialized = true ;
}

在NioEndPoint.init方法中,可以看到ServerSocketChannel被设置成阻塞IO,并且没有注册任何就绪事件。这样可以和阻塞ServerSocket一样方便地使用阻塞accept方法来接收客户端新来的连接。但不同的是当NioEndPoint.Accept线程通过accept方法获得一个新的SocketChannel后会构建一个OP_REGISTER类型的PollerEvent事件并放到Poller.events队列中。而我们使用ServerSocket实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。

在NioEndPoint.Accept的setSocketOptions方法中可以看到获得SocketChannel后的处理过程。步骤如下:

1)将SocketChannel设置成非阻塞;

2)构建OP_REGISTER类型的PollerEvent对象,并放入到Poller.events队列中。

代码2:NioEndPoint.Accept类的setSocketOptions方法

protected boolean setSocketOptions(SocketChannel socket) {
    try {
       //将客户端Socket设置为非阻塞, APR风格
        socket.configureBlocking( false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        //从缓存中取NioChannel对象,如果取不到直接构建一个
        NioChannel channel = nioChannels.poll();
        if ( channel == null ) {
            // 如果sslContext不等于null, 需要启动ssl
            if (sslContext != null) {
                ....
            }
            //正常tcp启动
            else {
                //构建NioBufferHandler对象
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(),
                                                                   socketProperties.getAppWriteBufSize(),
                                                                   socketProperties.getDirectBuffer());
                //构建NioChannel对象
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            //从缓存中取的NioChannel对象,将客户端socket设置进去
            channel.setIOChannel(socket);
            if ( channel instanceof SecureNioChannel ) {
                SSLEngine engine = createSSLEngine();
                ((SecureNioChannel)channel).reset(engine);
            } else {
                channel.reset();
            }
        }
        //注册NioChannel对象
        getPoller0().register(channel);
    } catch (Throwable t) {
        try {
            log.error("" ,t);
        } catch ( Throwable tt){}
        // Tell to close the socket
        return false ;
    }
    return true ;
}

Poller线程会从Poller.events队列中取出PollerEvent对象,并运行PollerEvent.run()方法。在PollerEvent.run()方法中发现是OP_REGISTER事件,则会在Poller.selector上注册SocketChannel对象的OP_READ就绪事件。

代码3:PollerEvent.run()方法代码片段

public void run() {
   if ( interestOps == OP_REGISTER ) {
       try {
           //在Poller.selector上注册OP_READ就绪事件
           socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key );
       } catch (Exception x) {
           log.error("" , x);
       }
   }
   ......
} 

至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的SocketChannel, 并注册OP_READ就绪事件到Poller.selector上(如图1)。下面就可以进行数据读写了。

图1:ServerSocketChannel和SocketChannel的初始化状态

2. Poller.selector的wakeup方法

Poller线程会做如下工作:

1) 通过selection操作获取已经选中的SelectionKey数量;

2) 执行Poller.events队列中的PollerEvent;

3) 处理已经选中的SelectionKey。

当有新PollerEvent对象加入Poller.events队列中,需要尽快执行第二步,而不应该阻塞的selection操作中。所以就需要配合Selector.wakeup()方法来实现这个需求。Tomcat使用信号量wakeupCounter来控制Selector.wakeup()方法,阻塞Selector.select()方法和非阻塞Selector.selectNow()方法的使用。

当有新PollerEvent对象加入Poller.events队列中,会按照如下条件执行Selector.wakeup()方法。

  • 当wakeupCounter加1后等于0,说明Poller.selector阻塞在selection操作,这时才需要调用Selector.wakeup()方法。
  • 当wakeupCounter加1后不等于0,说明Poller.selector没有阻塞在selection操作,则不需要调用Selector.wakeup()方法。并且为了尽快执行第二步,Poller线程在下一次直接调用非阻塞方法Selector.selectNow()。

代码4:Poller.addEvent()方法,实现将PollerEvent对象加入Poller.events队列中。

public void addEvent(Runnable event) {
   events.offer(event);
   //如果wakeupCount加1后等于0,则调用wakeup方法
   if ( wakeupCounter .incrementAndGet() == 0 ) selector.wakeup();
}



代码5: Poller线程的selection操作代码

if (wakeupCounter .get()>0) {
   keyCount = selector.selectNow();
 else {
   wakeupCounter.set(-1);
   keyCount = selector.select(selectorTimeout );
}
wakeupCounter.set(0);


这样的设计因为Java NIO的wakeup有如下的特性:

  • 在Selector对象上调用wakeup()方法将会导致第一个没有返回的selection操作立即返回。如果当前没有进行的selection操作,那么下一次的select()方法的调用将立即返回。而这个将wakeup行为延迟到下一个select()方法经常不是我们想要的(当然也不是Tomcat想要的)。我们一般只是想从sleeping的线程wakeup,但允许接下来的selection操作正常处理。

所以,Tomcat通过wakeupCounter信号量的变化来控制只有阻塞在selection操作的时候才调用Selector.wakeup()方法。当有新PollerEvent对象加入Poller.events队列中,并且没有处于阻塞在selection操作中,则直接调用非阻塞方法Selector.selectNow()。

3. 读(写)数据

Poller线程会调用Poller.processKey()方法处理已经选中的SelectionKey。

该方法会完成下面工作:

1)取消在Poller.selector上注册的OP_READ就绪事件;

2)启动工作线程来处理网络请求;

      2-1)读取和解析http请求数据

      2-2)如果是动态内容,则会调用用户自定义的Servlet类处理并返回结果给浏览器;如果是静态内容,则会直接返回静态资源数据给浏览器。

我们在这就不详细讨论http协议的实现以及Servlet的使用,直接跳到网络IO读写实现类NioSelectorPool。

NioSelectorPool类也提供了产生Selector对象的功能,通过NioSelectorPool.get()方法就可以获得一个Selector对象。

根据命令行参数-Dorg.apache.tomcat.util.net.NioSelectorShared的设置决定是否在SocketChannel中共享Selector。

  • 若会True(默认), 则所有的SocketChannel共享一个Selector;
  • 若为False,  则每一个SocketChannel使用不同的Selector(开启的Selector对象最多不超过NioSelectorPool.maxSelectors)。

从NioSelectorPool类中获得的Selector对象会传入到NioSelectorPool的read和write方法,并在网络IO读写时候使用。

NioSelectorPool类的读写方法提供了两种模式。通过方法的最后一个入参block控制。

1)读方法read():

  • block为False, 则是非阻塞模式。如果读不到数据,则直接返回了;如果读到数据则继续读。
  • block为True, 则是阻塞模式。如果第一次读取不到数据,会在NioSelectorPool提供的Selector对象上注册OP_READ就绪事件,并循环调用Selector.select(long)方法,超时等待OP_READ就绪事件。如果OP_READ事件已经就绪,并且接下来读到数据,则会继续读。read()方法整体会根据readTimeout设置进行超时控制。若超时,则会抛出SocketTimeoutException异常。

2)写方法write():

  • block为False, 则是非阻塞模式。写数据之前不会监听OP_WRITE事件。如果没有成功,则直接返回。
  • block为True, 则是阻塞模式。第一次写数据之前不会监听OP_WRITE就绪事件。如果没有写成功,则会在NioSelectorPool提供的selector注册OP_WRITE事件。并循环调用Selector.select(long)方法,超时等待OP_WRITE就绪事件。如果OP_WRITE事件已经就绪,并且接下来写数据成功,则会继续写数据。write方法整体会根据writeTimeout设置进行超时控制。如超时,则会抛出SocketTimeoutException异常。

另外如果是共享Selector(NioSelectorShared=true)并且阻塞模式(block=true),则会使用NioBlockingSelector类实现读写数据。该类与NioSelectorPool使用Java NIO的策略是类似的,但实现略有不同,本文就不详细分析了。

图2:事件注册在读写时候发生的变化

下面是NioSelectorPool的read方法,实现从网络IO中读取数据。该方法有5个参数:

  • buf 保存从网络IO中读取到的数据;
  • socket NioChannel对象,其中封装了SocketChannel;
  • selector 为block模式使用的Selector对象,在实际调用的时候,会将NioSelectorPool类提供的selector对象传进去;
  • readTimout 读超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别。

代码6:NioSelectorPool的read()方法

public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
    //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现数据读取
    if ( SHARED && block ) {
        return blockingSelector .read(buf,socket,readTimeout);
    }
    SelectionKey key = null;
    int read = 0;
    boolean timedout = false;
    //一开始我们认为是可以读的
    int keycount = 1; //assume we can read
    //开始时间
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
       //当没有超时,则继续读数据
        while ( (!timedout) ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only read if we were registered for a read
                cnt = socket.read(buf);
                if (cnt == -1) throw new EOFException();
                read += cnt;
                //如果读取到数据,则继续读
                if (cnt > 0) continue; //read some more
                //如果没有读取到数据,并且不是block模式,则直接break
                if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading
            }
            if ( selector != null ) {//perform a blocking read
                //在NioSelectionPool提供的selector上注册OP_READ事件
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
                else key.interestOps(SelectionKey.OP_READ);
                //调用Selector.select方法
                keycount = selector.select(readTimeout);
            }
            //计算是否超时
            if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
        } //while
          //如果超时,抛出SocketTimeoutException异常
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
        //在返回前,取消SelectionKey, 并将所有的key从selector中删掉
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return read;
}


下面是NioSelectorPool的写方法,实现向网络IO中写数据。该方法有5个参数:

  • buf 保存需要写入的数据;
  • socket NioChannel对象,其中封装了SocketChannel;
  • selector 为block模式使用的Selector对象,在实际调用的时候,会将NioSelectorPool类提供的selector对象传进去;
  • writeTimeout 写超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别;
  • lastWrite 最近写入数据的byte数量。

代码7:NioSelectorPool.write()方法

public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                 long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {
    //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现写数据
    if ( SHARED && block ) {
        return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
    }
    SelectionKey key = null;
    int written = 0;
    boolean timedout = false;
   //一开始我们认为是可以读的 
   int keycount = 1; //assume we can write
   //记录开始时间
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
        while ( (!timedout) && buf.hasRemaining() ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only write if we were registered for a write
                cnt = socket.write(buf); //write the data
                if (lastWrite!=null) lastWrite.set(cnt);
                if (cnt == -1) throw new EOFException();
               
                written += cnt;
                 //如果写数据成功,重新记录超时开始时间,并继续读
                if (cnt > 0) {
                    time = System. currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
                //如果写入数据为0,并且是非阻塞模式,则直接退出
                if (cnt==0 && (!block)) break; //don't block
            }
            if ( selector != null ) {
                //在NioSelectorPool的selector注册OP_WRITE事件
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                else key.interestOps(SelectionKey.OP_WRITE);
                keycount = selector.select(writeTimeout);
            }
            //是否超时
            if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
        } //while
          //如果超时,则直接抛出SocketTimeoutException异常
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
         //在返回前,取消SelectionKey, 并将所有的key从selector中删掉
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return written;
}

4. 总结

Tomcat在使用Java NIO的时候,将ServerSocketChannel配置成阻塞模式,这样可以方便地对ServerSocketChannel编写程序。当accept方法获得一个SocketChannel,并没有立即从线程池中取出一个线程来处理这个SocketChannel,而是构建一个OP_REGISTER类型的PollerEvent,并放到Poller.events队列中。Poller线程会处理这个PollerEvent,发现是OP_REGISTER类型,会在Poller.selector上注册一个这个SocketChannel的OP_READ就绪事件。如图1所示。

因为Java NIO的wakeup特性,使用wakeupCount信号量控制Selector.wakeup()方法,非阻塞方法Selector.selectNow()和阻塞方法Selector.select()的调用。我们在编写Java NIO程序时候也可以参考这种方式。

在SocketChannel上读的时候,分成非阻塞模式和阻塞模式。

  • 非阻塞模式,如果读不到数据,则直接返回了;如果读到数据则继续读。
  • 阻塞模式。如果第一次读取不到数据,会在NioSelectorPool提供的Selector对象上注册OP_READ就绪事件,并循环调用Selector.select(long)方法,超时等待OP_READ就绪事件。如果OP_READ事件已经就绪,并且接下来读到数据,则会继续读。read()方法整体会根据readTimeout设置进行超时控制。若超时,则会抛出SocketTimeoutException异常。

在SocketChannel上写的时候也分成非阻塞模式和阻塞模式。

  • 非阻塞模式,写数据之前不会监听OP_WRITE事件。如果没有成功,则直接返回。
  • 阻塞模式。第一次写数据之前不会监听OP_WRITE就绪事件。如果没有写成功,则会在NioSelectorPool提供的selector注册OP_WRITE事件。并循环调用Selector.select(long)方法,超时等待OP_WRITE就绪事件。如果OP_WRITE事件已经就绪,并且接下来写数据成功,则会继续写数据。write方法整体会根据writeTimeout设置进行超时控制。如超时,则会抛出SocketTimeoutException异常。

在写数据的时候,开始没有监听OP_WRITE就绪事件,直接调用write()方法。这是一个乐观设计,估计网络大部分情况都是正常的,不会拥塞。如果第一次写没有成功,则说明网络可能拥塞,那么再等待OP_WRITE就绪事件。

阻塞模式的读写方法没有在原有的Poller.selector上注册就绪事件,而是使用NioSelectorPool类提供的Selector对象注册就绪事件。这样的设计可以将各个Channel的就绪事件分散注册到不同的Selector对象中,避免大量Channel集中注册就绪事件到一个Selector对象,影响性能。

5. 参考资料

1)Tomcat6.0.18源码

2)Ron Hitchens的Java NIO

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340