聊聊NettyConnector的start及shutdown

本文主要研究一下NettyConnector的start及shutdown

NettyConnector

reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/NettyConnector.java

/**
 * A Netty connector is an inbound/outbound factory sharing configuration but usually no
 * runtime
 * (connection...) state at the exception of shared connection pool setups. Subscribing
 * to the returned {@link Mono} will effectively
 * create a new stateful "client" or "server" socket depending on the implementation.
 * It might also be working on top of a socket pool or connection pool as well,but the
 * state should be safely handled by the pool itself.
 * <p>
 * <p>Clients or Receivers will onSubscribe when their connection is established. They
 * will complete when the unique returned closing {@link Publisher} completes itself or if
 * the connection is remotely terminated. Calling the returned {@link
 * Disposable#dispose()} from {@link Mono#subscribe()} will terminate the subscription
 * and underlying connection from the local peer.
 * <p>
 * <p>Servers or Producers will onSubscribe when their socket is bound locally. They will
 * never complete as many {@link Publisher} close selectors will be expected. Disposing
 * the returned {@link Mono} will safely call shutdown.
 *
 * @param <INBOUND> incoming traffic API such as server request or client response
 * @param <OUTBOUND> outgoing traffic API such as server response or client request
 * @author Stephane Maldini
 * @since 0.6
 */
public interface NettyConnector<INBOUND extends NettyInbound,OUTBOUND extends NettyOutbound> {

    /**
     * Prepare a {@link BiFunction} IO handler that will react on a new connected state
     * each
     * time
     * the returned  {@link Mono} is subscribed. This {@link NettyConnector} shouldn't assume
     * any state related to the individual created/cleaned resources.
     * <p>
     * The IO handler will return {@link Publisher} to signal when to terminate the
     * underlying resource channel.
     *
     * @param ioHandler the in/out callback returning a closing publisher
     *
     * @return a {@link Mono} completing with a {@link Disposable} token to dispose
     * the active handler (server,client connection...) or failing with the connection
     * error.
     */
    Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND,? super OUTBOUND,? extends Publisher<Void>> ioHandler);

    /**
     * Start a Client or Server in a blocking fashion,and wait for it to finish initializing.
     * The returned {@link BlockingNettyContext} class offers a simplified API around operating
     * the client/server in a blocking fashion,including to {@link BlockingNettyContext#shutdown() shut it down}.
     *
     * @param handler the handler to start the client or server with.
     * @param <T>
     * @return a {@link BlockingNettyContext}
     */
    default <T extends BiFunction<INBOUND,OUTBOUND,? extends Publisher<Void>>>
    BlockingNettyContext start(T handler) {
        return new BlockingNettyContext(newHandler(handler),getClass().getSimpleName());
    }

    /**
     * Start a Client or Server in a blocking fashion,including to {@link BlockingNettyContext#shutdown() shut it down}.
     *
     * @param handler the handler to start the client or server with.
     * @param timeout wait for Client/Server to start for the specified timeout.
     * @param <T>
     * @return a {@link BlockingNettyContext}
     */
    default <T extends BiFunction<INBOUND,? extends Publisher<Void>>>
    BlockingNettyContext start(T handler,Duration timeout) {
        return new BlockingNettyContext(newHandler(handler),getClass().getSimpleName(),timeout);
    }

    /**
     * Start a Client or Server in a fully blocking fashion,not only waiting for it to
     * initialize but also blocking during the full lifecycle of the client/server.
     * Since most servers will be long-lived,this is more adapted to running a server
     * out of a main method,only allowing shutdown of the servers through sigkill.
     * <p>
     * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added
     * by this method in order to properly disconnect the client/server upon receiving
     * a sigkill signal.
     *
     * @param handler the handler to execute.
     */
    default <T extends BiFunction<INBOUND,? extends Publisher<Void>>>
    void startAndAwait(T handler) {
        startAndAwait(handler,null);
    }

    /**
     * Start a Client or Server in a fully blocking fashion,only allowing shutdown of the servers through sigkill.
     * <p>
     * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added
     * by this method in order to properly disconnect the client/server upon receiving
     * a sigkill signal.
     *
     * @param handler the handler to execute.
     * @param onStart an optional callback to be invoked once the client/server has finished
     * initializing.
     */
    default <T extends BiFunction<INBOUND,? extends Publisher<Void>>>
    void startAndAwait(T handler,@Nullable Consumer<BlockingNettyContext> onStart) {
        BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler),getClass().getSimpleName());

        facade.installShutdownHook();

        if (onStart != null) {
            onStart.accept(facade);
        }

        facade.getContext()
              .onClose()
              .block();
    }
}
可以看到这个类有5个方法,一个newHandler是non-blocking模式的,其他的几个start开头的都是blocking模式的( duration参数用于指定等待初始化完成的超时时间),使用的是BlockingNettyContext

newHandler

newHandler返回的是一个Mono<? extends NettyContext>,在这个mono完成的时候,会自己dispose。

实例如下

@Test
    public void testNewHandler() throws InterruptedException {
        TcpClient client = TcpClient.create("localhost",9090);
        Mono<? extends NettyContext> mono = client.newHandler((inbound,outbound) -> {
            return outbound.sendString(Mono.just("Hello World!")).then();
        });
        
        CountDownLatch latch = new CountDownLatch(1);

        Disposable disposable = mono
                .doFinally(e -> {
                    System.out.println("finish:"+e);
                    latch.countDown();
                })
                .subscribe();

        latch.await();
        System.out.println(disposable.isDisposed());
    }

start

start方法返回的是BlockingNettyContext,用户可以调用BlockingNettyContext的shutdown方法来dispose nettyContext,比如

@Test
    public void testShutdown(){
        TcpClient client = TcpClient.create("localhost",9090);
        CountDownLatch latch = new CountDownLatch(1);
        BlockingNettyContext context = client.start((inbound,outbound) -> {
            latch.countDown();
            return outbound.sendString(Mono.just("hello world"))
                    .then();
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            context.shutdown();
        }

    }

reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/tcp/BlockingNettyContext.java

/**
     * Shut down the {@link NettyContext} and wait for its termination,up to the
     * {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
     */
    public void shutdown() {
        if (context.isDisposed()) {
            return;
        }

        removeShutdownHook(); //only applies if not called from the hook's thread

        context.dispose();
        context.onClose()
               .doOnError(e -> LOG.error("Stopped {} on {} with an error {}",description,context.address(),e))
               .doOnTerminate(() -> LOG.info("Stopped {} on {}",context.address()))
               .timeout(lifecycleTimeout,Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
               .block();
    }

    /**
     * Remove a {@link Runtime#removeShutdownHook(Thread) JVM shutdown hook} if one was
     * {@link #installShutdownHook() installed} by this {@link BlockingNettyContext}.
     *
     * @return true if there was a hook and it was removed,false otherwise.
     */
    public boolean removeShutdownHook() {
        if (this.shutdownHook != null && Thread.currentThread() != this.shutdownHook) {
            Thread sdh = this.shutdownHook;
            this.shutdownHook = null;
            return Runtime.getRuntime().removeShutdownHook(sdh);
        }
        return false;
    }
这里的shutdown主要是移除当前的shutdownHook,然后dispose nettyContext

startAndAwait

startAndAwait方法调用了BlockingNettyContext的installShutdownHook来进行关闭
reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/tcp/BlockingNettyContext.java

/**
     * Install a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} that will
     * shutdown this {@link BlockingNettyContext} if the JVM is terminated externally.
     * <p>
     * The hook is removed if shutdown manually,and subsequent calls to this method are
     * no-op.
     */
    public void installShutdownHook() {
        //don't return the hook to discourage uninstalling it externally
        if (this.shutdownHook != null) {
            return;
        }
        this.shutdownHook = new Thread(this::shutdownFromJVM);
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    protected void shutdownFromJVM() {
        if (context.isDisposed()) {
            return;
        }

        final String hookDesc = Thread.currentThread().toString();

        context.dispose();
        context.onClose()
               .doOnError(e -> LOG.error("Stopped {} on {} with an error {} from JVM hook {}",e,hookDesc))
               .doOnTerminate(() -> LOG.info("Stopped {} on {} from JVM hook {}",hookDesc))
               .timeout(lifecycleTimeout,Mono.error(new TimeoutException(description +
                       " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
               .block();
    }
在shutdownHook里头注册了shutdownFromJVM方法,用于关闭NettyContext。

实例

@Test
    public void testStartAndAwait(){
        TcpClient client = TcpClient.create("localhost",9090);
        client.startAndAwait((inbound,outbound) -> {
            return outbound.sendString(Mono.just("hello world"))
                    .then();
        });
    }

小结

NettyConnector提供了non-blocking及blocking两种使用方式,non-blocking的话,使用newHandler返回一个Mono<? extends NettyContext>,在它会在完成的时候,自己dispose nettyContext;blocking的话,startAndAwait方法会自动帮你注册shutdownHook来dispose nettyContext,而start方法则返回BlockingNettyContext,允许调用shutdown方法来dispose nettyContext。

doc

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如果组件之中有复用的代码,需要重新创建一个父类,父类中存储公共代码,返回子类,同时把公用属性...
我们上一节了解了组件的更新机制,但是只是停留在表层上,例如我们的 setState 函数式同步执行的,我们的事件处理直接绑定在了 dom 元素上,这些都跟 re...
我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom 转为真实 dom 进行挂载。其实函数是组件和类组件也是在这个基础上包裹了一层,一个是调...
react 本身提供了克隆组件的方法,但是平时开发中可能很少使用,可能是不了解。我公司的项目就没有使用,但是在很多三方库中都有使用。本小节我们来学习下如果使用该...
mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接触 react 就一直使用 mobx 库,上手简单不复杂。
我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc 端可以使用分页进行渲染数限制,在移动端可以使用下拉加载更多。但是对于大量的列表渲染,特别像有实时数据...
本小节开始前,我们先答复下一个同学的问题。上一小节发布后,有小伙伴后台来信问到:‘小编你只讲了类组件中怎么使用 ref,那在函数式组件中怎么使用呢?’。确实我们...
上一小节我们了解了固定高度的滚动列表实现,因为是固定高度所以容器总高度和每个元素的 size、offset 很容易得到,这种场景也适合我们常见的大部分场景,例如...
上一小节我们处理了 setState 的批量更新机制,但是我们有两个遗漏点,一个是源码中的 setState 可以传入函数,同时 setState 可以传入第二...
我们知道 react 进行页面渲染或者刷新的时候,会从根节点到子节点全部执行一遍,即使子组件中没有状态的改变,也会执行。这就造成了性能不必要的浪费。之前我们了解...
在平时工作中的某些场景下,你可能想在整个组件树中传递数据,但却不想手动地通过 props 属性在每一层传递属性,contextAPI 应用而生。
楼主最近入职新单位了,恰好新单位使用的技术栈是 react,因为之前一直进行的是 vue2/vue3 和小程序开发,对于这些技术栈实现机制也有一些了解,最少面试...
我们上一节了了解了函数式组件和类组件的处理方式,本质就是处理基于 babel 处理后的 type 类型,最后还是要处理虚拟 dom。本小节我们学习下组件的更新机...
前面几节我们学习了解了 react 的渲染机制和生命周期,本节我们正式进入基本面试必考的核心地带 -- diff 算法,了解如何优化和复用 dom 操作的,还有...
我们在之前已经学习过 react 生命周期,但是在 16 版本中 will 类的生命周期进行了废除,虽然依然可以用,但是需要加上 UNSAFE 开头,表示是不安...
上一小节我们学习了 react 中类组件的优化方式,对于 hooks 为主流的函数式编程,react 也提供了优化方式 memo 方法,本小节我们来了解下它的用...
开源不易,感谢你的支持,❤ star me if you like concent ^_^
hel-micro,模块联邦sdk化,免构建、热更新、工具链无关的微模块方案 ,欢迎关注与了解
本文主题围绕concent的setup和react的五把钩子来展开,既然提到了setup就离不开composition api这个关键词,准确的说setup是由...
ReactsetState的执行是异步还是同步官方文档是这么说的setState()doesnotalwaysimmediatelyupdatethecomponent.Itmaybatchordefertheupdateuntillater.Thismakesreadingthis.staterightaftercallingsetState()apotentialpitfall.Instead,usecom