Netty使用


前言

Netty是一个高性能,异步事件驱动的NIO框架,基于Java进行开发。所有的IO操作都是异步非阻塞的,能够通过Future-Listener机制获取异步IO的结果。

一、第一个Netty程序

  1. 服务端
public class NettyServer {

    public static void main(String[] args) {
        // 1. 服务端组件,组装netty的组件
        new ServerBootstrap()
                // 2. BootEventLoop, 包含selector, thread
                .group(new NioEventLoopGroup())
                // 3. 连接服务器的ServerSocketChannel实现,accept事件
                .channel(NioServerSocketChannel.class)
                // 4. 添加响应的事件处理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());  // 将ByteBuf转换为字符
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override  // 添加读事件
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                })
                // 6. 绑定端口
                .bind(8888);
    }
}
  1. 客户端
public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
                // 2. 添加EventLoop
                .group(new NioEventLoopGroup())
                // 3. 添加客户端ServerSocket的实现
                .channel(NioSocketChannel.class)
                // 4. 添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 5. 连接服务器
                .connect(new InetSocketAddress("localhost", 8888))
                // 6. 连接建立前进行阻塞
                .sync()
                // 7. 创建Channel
                .channel()
                // 8. 向服务端发送数据并进行刷新
                .writeAndFlush("hello netty");
    }
}
  1. 执行流程

    在这里插入图片描述

channel: 数据传输的通道
handler:对于数据处理操作
pipeline:多个handler的集合可以称为pipeline
eventLoop: 包含boss和worker,boss线程负责连接的建立,worker具体处理数据。每一个线程对每一个channel负责到底(即使中途阻塞切换任务,但是在这个任务数据准备完成后仍是刚才的worker线程进行处理)

二、组件

2.1 EventLoop

EventLoop: 单线程的执行器,用来处理Channel上的io事件
而EventLoopGroup是一组EventLoop,对于每一个Channel会通过register()方法注册到一个EventLoop上,后续的操作都由这个EventLoop进行处理

  1. EventLoopGroup构造器
/* NioEventLoopGroup() 可以处理io请求,普通任务,定时任务
   参数含义:可以传递执行任务的线程数量,如果不传递参数会使用默认线程数:DEFAULT_EVENT_LOOP_THREADS
   默认分配为 系统Math.max(1, CPU核数 * 2)
*/
EventLoopGroup eventExecutors = new NioEventLoopGroup(2);
// DefaultEventLoop() 处理普通任务,定时任务
// EventLoopGroup eventExecutors = new DefaultEventLoop();

System.out.println("系统CPU核数:" + NettyRuntime.availableProcessors());

// next() 获取下一个执行的EventLoop对象,内部会根据分配的线程数提供轮询策略
System.out.println(eventExecutors.next());  // io.netty.channel.nio.NioEventLoop@18ef96
System.out.println(eventExecutors.next());  // io.netty.channel.nio.NioEventLoop@6956de9
// System.out.println(eventExecutors.next());  // io.netty.channel.nio.NioEventLoop@18ef96
  1. 普通任务和定时任务
// 添加一个普通任务,异步的进行处理
eventExecutors.next().execute(() -> {
    System.out.println("internal");
});
System.out.println("main");

// 添加一个定时任务
eventExecutors.next().scheduleAtFixedRate(() -> {
    System.out.println("schedule task");
}, 0, 1, TimeUnit.SECONDS);
  1. IO操作
    修改第一个Netty程序中的Client
public static void main(String[] args) throws InterruptedException {
        Channel ch = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println("Channel Object:" + ch);
        ch.writeAndFlush("");
    }

debug模式下,开启IDEA的并行运行,然后使用Evalute Expression

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


结论:EventLoopGroup是多线程环境,可以同时处理多个客户端;但是对于每一个客户端中的channel会和其中一个EventLoop进行绑定,下次的信息发送仍由这个EventLoop进行处理。

  1. Netty的职责划分
    修改第一个Netty程序中的服务端
new ServerBootstrap()
    // 职责划分:Netty中具有boss和worker线程组的概念,目的在于boss线程专门用于建立连接,worker用来处理任务
    // 对于parentGroup中由于服务端只有一个,所以最多绑定到一个线程上
    .group(new NioEventLoopGroup(), new NioEventLoopGroup())

第二种情况:每个EventLoop的底层采用多路复用技术,因此可以处理多个客户端的请求;但是一旦某一个客户端执行时间过长,会导致该EventLoop下的其他客户端阻塞;
解决:将耗时的服务交给其他的线程组进行处理

public static void main(String[] args) {
        EventLoopGroup group = new DefaultEventLoop();
        new ServerBootstrap()
                // 职责划分:Netty中具有boss和worker线程组的概念,目的在于boss线程专门用于建立连接,worker用来处理任务
                // 对于parentGroup中由于服务端只有一个,所以最多绑定到一个线程上
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 处理读事件
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 当没有使用StringDecoder的时候,此时msg为ByteBuf类型
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + ": " + buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg);  // 将数据将给下一个Handler进行处理
                            }
                        }).addLast(group, "other group", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 当没有使用StringDecoder的时候,此时msg为ByteBuf类型
                                ByteBuf buf = (ByteBuf) msg;
                                Thread.sleep(5000);  // 模拟耗时
                                System.out.println("耗时操作 =》" + Thread.currentThread().getName() + ": " + buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080);
    }

这里在外部额外创建一个DefaultEventLoop用于处理第二个Handler中出现的耗时处理状态;

  • 测试
    启动三个客户端,并且设置服务端worker线程为2,这样客户端1,3应该处于同一个EventLoop当中

    在这里插入图片描述

  • 结果
    这里的客户端1,3在同一个NioEventLoop处理,由于将耗时操作引入到DefaultEventLoop当中,即使客户端3的耗时操作先于客户端1,但是客户端第一个Handler的数据处理没有发生阻塞。
  1. 如何为handler切换线程?
    上述场景中的EventLoop分配状态如下:

    在这里插入图片描述

如何在粉色h1和绿色h2中间切换不同的EventLoop?

底层实现:AbstractChannelHandlerContext.java # invokeChannelRead

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获取下一个需要执行的EventLoop
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    // 如果下一个执行的线程和当前线程是同一个EventLoop,在当前线程中继续处理。
        next.invokeChannelRead(m);
    } else {
    // 下一个线程executor和当前线程不同,将任务交付给executor进行处理
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

2.2 Channel

  1. 异步数据的传输

channel是数据传输的载体,在处理客户端连接建立的过程中,使用connet()方法可以获得ChannelFuture对象,在Netty中IO的操作都是异步的,因此对于每一个IO操作无法保证操作被实际完成,而ChannelFutrure对象就是异步非阻塞的。
在前面实先客户端的时候,代码如下:

ChannelFuture channelFuture = new Bootstrap()
                .... 
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture
                .sync()
                .channel();
        channel.writeAndFlush("hello");

上述代码获取ChannelFuture后会使用sync()方法进行阻塞,保证连接成功创建后,才会继续执行后续的数据发送操作。

上述的模式为同步,执行 *channel.writeAndFlush(“hello”);*的时候由 main 线程主导。


这是Netty中处理异步操作的一种解决措施,第二种是通过ChannelFuture中的addListener方法,监听直到连接创建,使得整个操作都是异步处理的。
channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Channel channel = future.channel();
                System.out.println(Thread.currentThread().getName() + " : " + channel);
                channel.writeAndFlush("hello");
            }
        });

在监听到连接创建后,会通过回调方式执行 operationComplete 方法,此时运行的线程为 nio 线程

在这里插入图片描述


2. channel的关闭

问题描述:修改上述代码,想要提供用户一个持续发送数据,并且输入 'q’程序便会退出的,以及提供关闭后的额外操作场景。

public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.addListener((ChannelFutureListener) future -> {
            Channel channel = future.channel();
            System.out.println(Thread.currentThread().getName() + " : " + channel);
            channel.writeAndFlush("hello");
        });

        Channel channel = channelFuture.channel();
        // 创建一个新的线程支持用户持续输入
        new Thread(new Runnable() {
            @Override
            public void run() {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String msg = scanner.nextLine();
                    if ("q".equals(msg)) {
                        channel.close();  // 异步操作
                        System.out.println(Thread.currentThread().getName() + " 程序退出");
                        break;
                    }
                    channel.writeAndFlush(msg);
                }
            }
        }, "input").start();

但是这里的 channel.close(); 属于Netty中的异步操作,因此无法保证 channel关闭和后续的处理之间的同步关系。

解决方案: Netty中对于channel的关闭提供了CloseFuture的对象,和ChannelFuture类似,也包含同步和异步两种模式。

  • 同步处理
    修改代码如下:
// 代码省略
new Thread(new Runnable() {
   @Override
    public void run() {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String msg = scanner.nextLine();
            if ("q".equals(msg)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(msg);
        }
    }
}, "input").start();

ChannelFuture closeFuture = channel.closeFuture();
System.out.println("关闭前阻塞");
closeFuture.sync();
System.out.println(Thread.currentThread().getName() + " 程序退出");

此时具体执行后续处理的部分为 main 线程

  • 异步处理
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture future) throws Exception {
       System.out.println(Thread.currentThread().getName() + " 程序退出");  // nioEventLoopGroup-2-1 程序退出
   }
});

问题:在上述方案添加后,Java程序并没有正常终止

在这里插入图片描述


原因: Netty中的EventLoopGroup中的其他线程仍在工作,需要手动进行关闭

NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        System.out.println(Thread.currentThread().getName() + " 程序退出");  // nioEventLoopGroup-2-1 程序退出
        group.shutdownGracefully();  // 优雅关闭EventLoopGroup
    }
});

在这里插入图片描述

2.3 Future & Promise

  1. JDK中的Future
    该Future提供异步等待,获取数据的方式;主线程中可以通过Future对象的get()阻塞当前线程,然后等待数据的获取
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executors = Executors.newFixedThreadPool(2);
    Future<Integer> future = executors.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            log.info("开始计算");
            Thread.sleep(1000);
            return 80;
        }
    });
    log.info("等待获取结果");
    Integer res = future.get();  // 主线程阻塞,等待结果返回
    log.info("计算结果:{}", res);

}
  1. Netty中的Future
    与JDK中的Future对象类似,是对其的一种封装;支持同步/异步获取数据的方式。
public static void main(String[] args) throws ExecutionException, InterruptedException {
   NioEventLoopGroup group = new NioEventLoopGroup();
    EventLoop eventLoop = group.next();  // 获取一个EventLoop对象

    // 提交一个异步任务
    Future<Integer> future = eventLoop.submit(() -> {
        log.info("开始计算");
        Thread.sleep(1000);
        return 80;
    });
    log.info("等待获取结果");
    Integer res = future.get();
    log.info("计算结果:{}", res);
}

整体代码的编写类似,只是在处理线程池的时候对于Netty中使用的EventLoop; Netty中还提供异步获取结果的方式(不需要get阻塞)。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    EventLoop eventLoop = group.next();  // 获取一个EventLoop对象

    // 提交一个异步任务
    Future<Integer> future = eventLoop.submit(() -> {
        log.info("开始计算");
        Thread.sleep(1000);
        return 80;
    });
    future.addListener(f -> {
        log.info("等待获取结果");
        // 立即获取数据
        log.info("{} 计算结果:{}", Thread.currentThread().getName(), f.getNow());  // nioEventLoopGroup-2-1 计算结果:80
    });
}
  1. Promise对象
    无论是JDK还是Netty中的Future对象,对于数据的处理都是由该对象的获取都需要通过线程池就行获取,无法手动创建;Promise可以实现用户自定义异步对象,然后存入相应的数据。
// Promise是对Future对象的进一步封装
public interface Promise<V> extends Future<V> 
public static void main(String[] args) throws ExecutionException, InterruptedException {

    EventLoop eventLoop = new NioEventLoopGroup().next();

    // 提供一个异步对象promise
    Promise<Integer> promise = new DefaultPromise<>(eventLoop);
    eventLoop.submit(() -> {
        log.info("开始计算");
        try {
            Thread.sleep(1000);
            promise.setSuccess(808); // 运行正常返回结果
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    log.info("等待获取结果");
    promise.addListener(future -> {
        // 立即获取数据
        log.info("{} 计算结果:{}", Thread.currentThread().getName(), future.getNow());  // nioEventLoopGroup-2-1 计算结果:808
    }); 
}

2.4 Pipeline

Pileline是一组Handler组成的链式结构,用来对chennel中的数据进行额外的处理。
对于Handler包含InBound以及OutBound两种,对应入站和出战两个操作;InBound在读数据的时候可以进行二次的修改然后传递给后续的Handler;而OutBound在处理写数据的时候才会触发。

在这里插入图片描述


整个pipeline是双向链表;inBoundHandler从Head处向后遍历;而对于OutBound从tail向前进行查找,二者的顺序是相反的。

new ServerBootstrap()
           .group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {
               @Override
               protected void initChannel(NioSocketChannel ch) throws Exception {
                   // 处理读事件
                   ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                       @Override
                       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                           ByteBuf buf = (ByteBuf) msg;
                           String newMsg = buf.toString(Charset.defaultCharset()) + "-hello";
                           log.info("server: {}", newMsg);
                           
                			// 注意这里的两行代码
//                                ctx.writeAndFlush(msg);
                           ch.writeAndFlush(newMsg);
                       }
                   }).addLast(new ChannelOutboundHandlerAdapter() {
                       @Override
                       public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                           log.info("outBound 1");
                           super.write(ctx, msg, promise);
                       }
                   });
               }
           }).bind(8080);
}

客户端发送hello, 服务端在原有基础上加工打印:

在这里插入图片描述

// ctx.writeAndFlush(msg);
ch.writeAndFlush(newMsg);

在InBound中存在这两行代码。发现,只有使用ch.writeAndFlush的时候才能正常打印OutBound中的内容。
原因:

  • 首先自由触发写事件 writexxx的时候才能执行OutBound
  • 整个handler链的结构为:head -> h1 -> h2 -> tail
    当使用 ch.writeAndFlush的时候会从tail开始向前查找OutBound;而如果使用 ctx.writeAndFlush,会从当前调用方法的InBound处在handler链中向前查找。
        由于此时handler链的结构为: head -> inbound -> outbound -> head,如果使用 ctx.xxxx,会从inBound开始向head方向查找outbound,此时无法找到。

2.5 ByteBuf

Netty中对于ByteBuffer的封装,是数据的载体

  1. ByteBuf的创建
public static void main(String[] args) {
    // 默认创建一个ByteBuf; 默认初始容量为256;
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    System.out.println(buf);
    // 支持动态扩容
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 300; i++) {
        sb.append("a");
    }
    buf.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
    // (ridx: 0, widx: 300, cap: 512), 最大容量512,进行2倍扩容
    System.out.println(buf);  
}

使用 ByteBufAllocator.DEFAULT.buffer(); 默认使用直接内存进行创建,并开启池化技术;减少ByteBuf的创建和销毁,提高整体性能,使用直接内容提高读写效率,相比于堆内存创建减少GC带来的影响。

// 创建的类对象:PooledUnsafeDirectByteBuf
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();

使用堆内存创建

ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();  // PooledUnsafeHeapByteBuf

ByteBuf的组成

在这里插入图片描述

扩容规则

在这里插入图片描述

  1. ByteBuf的优势
  • 池化技术,可以重用ByteBuf的实例,节约内容,减少内存溢出。
  • 读写指针分离,不需要像ByteBuffer进行切换
  • 自动扩容
  • 支持链式调用
  • 很多方法如:slice, duplicate, compositeByteBuf使用零拷贝技术。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340