Netty客户端在几天或几小时后停止读取套接字数据

如何解决Netty客户端在几天或几小时后停止读取套接字数据

我有以下情况:

java 1.8

spring-boot-starter-parent 2.3.2发布

spring-webflux 5.2.8。发布

spring-boot-starter-reactor-netty 2.3.2发布

该应用程序在WebSphere Application Server 9上运行

我的应用程序是具有重新连接支持的Netty客户端,该客户端读取发送到套接字的数据。获取数据并搜索帧的开始和结束定界符,一旦找到,就将其发送到下一个处理该信息的处理程序。几天或几小时后,客户端停止捕获发送到套接字的数据,观察日志文件的唯一错误是:

2020-08-13 15: 31: 34,885 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release () was not called before garbage collection

这是主班

    public void run() {
        LOGGER.info("Levantando la aplicacion CAPTURADOR");
        closed = false;
        workerGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addFirst(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        super.channelInactive(ctx);
                        ctx.channel().eventLoop().schedule(() -> doConnect(),1,TimeUnit.SECONDS);
                    }
                });
                socketChannel.pipeline().addLast(frameExtractor);
                socketChannel.pipeline().addLast(new LoggingHandler("SERVER_LOG",LogLevel.valueOf(logLevel)));
                socketChannel.pipeline().addLast(clientHandler);
            }
        });
        doConnect();
    }

    /**
     *
     */
    private void doConnect() {
        if (closed) {
            return;
        }
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(remoteHost,remotePort));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    LOGGER.info("Started Tcp Client: " + getServerInfo());
                } else {
                    LOGGER.error("Started Tcp Client Failed: " + getServerInfo());
                    f.channel().eventLoop().schedule(() -> doConnect(),TimeUnit.SECONDS);
                }
            }
        });
    }

这是FrameExtractor类


    /**
     * En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo,* aprovechamos para crear el buffer que mencionamos.
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer();
    }

    /**
     * En el metodo channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar
     * activo,aprovechamos para liberar el buffer que creamos @channelRegistered.
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (null != buf) {
            buf.release();
            buf = null;
        }

    }

 /**
     * Arma el envio de la medicion buscando el fin de trama y lo pasa al siguiente handler
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        try {
            Level level = ResourceLeakDetector.getLevel();
            // El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
            // nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
            buf.writeBytes((ByteBuf) msg);

            String data = buf.toString(Charset.defaultCharset());

            int indexOf1 = indexOf(buf,Directlink.DELIMETER_DIRECTLINK,1);
            int indexOf2 = indexOf(buf,2);

            while (-1 != indexOf2) {
                // Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
                ByteBuf line = ctx.alloc().buffer();
                line = buf.copy(indexOf1,indexOf2 - indexOf1);
                // Agregamos al buffer buf todos los bytes hasta el indicador de fin de trama
                buf.readBytes(indexOf2);
                // Avisamos al siguiente handler,pasandole nuestro buffer line. No liberamos el buffer line porque es
                // responsabilidad del que lo recibe.
                ctx.fireChannelRead(line);
                buf.discardReadBytes();
                indexOf1 = indexOf(buf,1);
                indexOf2 = indexOf(buf,2);
            }
        } finally {
            // Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
            // nadie es nuestra responsabilidad liberarlo.
            ReferenceCountUtil.release(msg);
        }
    }

这是ClientHandler

    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        ByteBuf buffer = ctx.alloc().buffer();
        try {
            buffer.writeBytes((ByteBuf) msg);
            byte[] bytes = new byte[buffer.readableBytes()];
            int readerIndex = buffer.readerIndex();
            buffer.getBytes(readerIndex,bytes);
            bytes = CapturadorUtils.eliminarParidad(bytes);
            String trama = new String(bytes);
            CapturadorGenerico capturadorGenerico = trama.contains(Directlink.KEY_DIRECTLINK)
                    ? capturadorFactory.getCapturador(Directlink.getDirectlink())
                            : capturadorFactory.getCapturador(Microcom.MICROCOM);
                    capturadorGenerico.parsearTrama(trama,bytes);
        } catch (Exception e) {
            LOGGER.error("Error producido en el pipe ClientHandler con la trama: " + msg,e);
        } finally {
            // Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
            // nadie es nuestra responsabilidad liberarlo.
            ReferenceCountUtil.release(msg);
            ReferenceCountUtil.release(buffer);
        }
    }

查看代码并分析文档https://netty.io/wiki/reference-counted-objects.html,我没有发现错误可能是什么。缓冲区释放正确。

添加日志文件

2020-08-14 11:23:30,404 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:173)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
    cl.mop.dga.satelital.capturador.handler.FrameExtractor.channelRead(FrameExtractor.java:76)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:682)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:617)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:534)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:785)

在行中

ByteBuf行= ctx.alloc()。buffer();

line = buf.copy(indexOf1,indexOf2-indexOf1);

我改变方法

public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
        try {
            // El msg que recibimos como parametro es un ByteBuf de Netty. Añadimos todo su contenido al final de
            // nuestro ByteBuf buf para ir acumulando el envio de bytes hasta que se encuentre el fin de envio de trama
            buf.writeBytes((ByteBuf) msg);

            String data = buf.toString(Charset.defaultCharset());
            LOGGER.info("Trama recibida: " + data);

            int indexOf1 = indexOf(buf,2);

            while (-1 != indexOf2) {
                // Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
                ByteBuf line = buf.copy(indexOf1,2);
            }
        } finally {
            // Liberamos el buffer que nos ha llegado por parametro. Como ya no lo necesitamos y no se lo hemos pasado a
            // nadie es nuestra responsabilidad liberarlo.
            ReferenceCountUtil.release(msg);
        }
    }

仍然有错误

2020-08-27 16:33:36,256 ERROR [nioEventLoopGroup-2-1] i.n.u.ResourceLeakDetector [?:?] LEAK: ByteBuf.release() was not called before it's garbage-collected

但现在在行

buf.readBytes(indexOf2);

添加类ClientWithNettyHandlers

`public class ClientWithNettyHandlers extends SpringBootServletInitializer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClientWithNettyHandlers.class);

    @Autowired
    @Qualifier("ClientHandler")
    ClientHandler clientHandler;

    @Autowired
    @Qualifier("FrameExtractor")
    FrameExtractor frameExtractor;

    private volatile EventLoopGroup workerGroup;
    private volatile Bootstrap bootstrap;
    private volatile boolean closed = false;
    private String remoteHost;
    private int remotePort;
    private String logLevel;

    @Bean
    public void run() {
        closed = false;
        workerGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addFirst(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                        super.channelInactive(ctx);
                        ctx.channel().eventLoop().schedule(() -> doConnect(),TimeUnit.SECONDS);
                }
            }
        });
    }

    /**
     *
     */
    @PreDestroy
    public void closeNettyClient() {
        close();
        System.out.println("Shutting down Netty Client: " + getServerInfo());
    }

    /**
     *
     */
    public void close() {
        closed = true;
        Future<?> future = workerGroup.shutdownGracefully();
        future.syncUninterruptibly();
        LOGGER.info("Stopped Tcp Client: " + getServerInfo());
    }

    /**
     *
     * @return
     */
    private String getServerInfo() {
        return String.format("RemoteHost=%s RemotePort=%d",remoteHost,remotePort);
    }

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(ClientWithNettyHandlers.class);
    }

    /**
     *
     * @param args
     */
    public static void main(String[] args) {
        InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
        SpringApplication.run(ClientWithNettyHandlers.class,args);
    }

}

解决方法

泄漏是这样的:

             // Creamos un nuevo ByteBuf para copiar la trama hasta el indicador del fin de trama
            ByteBuf line = ctx.alloc().buffer();
            line = buf.copy(indexOf1,indexOf2 - indexOf1);

当您之前用ctx.alloc().buffer();替换line时,这会泄漏通过release()分配的缓冲区。例如,您可以使用以下方法解决此问题:

            ByteBuf line = buf.copy(indexOf1,indexOf2 - indexOf1);
,

最后,我通过修改管道中的最后一个处理程序解决了错误,并为获得的每个帧创建了一个线程,并发送该线程以继续应用程序自己的处理,解析,在数据库中插入等。 也许Netty在缓冲区的处理中以及通过延迟对存储帧的缓冲区的处理,直到确定缓冲区的开始和结束已满,来管理某种预防机制。 我想强调的是,我最初在处理缓冲区时遇到的错误可以通过@NormanMaurer的建议使用skipBytes(...)解决。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-