使用高性能Pipelines构建.NET通讯程序

.NET Standard支持一组新的API,System.Span,System.Memory,还有System.IO.Pipelines。这几个新的API极大了提升了.NET程序的效能,将来.NET很多基础API都会使用它们进行重写。

Pipelines旨在解决.NET编写Socket通信程序时的很多困难,相信读者也对此不胜其烦,使用stream模型进行编程,就算能够解决,也是实在麻烦。

System.IO.Pipelines使用简单的内存片段来管理数据,可以极大的简化编写程序的过程。关于Pipelines的详细介绍,可以看看这里。现在ASP.NET Core中使用的Kestrel已经在使用这个API。(话说这个东西貌似就是Kestrel团队搞出来的。)

可能是直接需要用Socket场景有限(物联网用的还挺多的),Pipelines相关的资料感觉不是很多。官方给出的示例是基于ASCII协议的,有固定结尾的协议,这里我以物联网设备常用的BINARY二进制自定义协议为例,讲解基于Pipelines的程序套路。

System.IO.Pipelines

与基于Stream的方式不同,pipelines提供一个pipe,用于存储数据,pipe中间存储的数据有点链表的感觉,可以基于SequencePosition进行slice操作,这样就能得到一个ReadOnlySequence<T>对象。reader可以进行自定义操作,并在操作完成之后告诉pipe已经处理了多少数据,整个过程是不需要进行内存复制操作的,因此性能得到了提升,还少了很多麻烦。可以简单理解作为服务器端,流程:

接受数据循环:接到数据->放pipe里面->告诉pipe放了多少数据
处理数据循环:在pipe里面找一条完整数据->交给处理流程->告诉pipe处理了多少数据

协议

有一款设备,binary协议,数据包开头0x75,0xbd,0x7e,0x97一共4个字节,随后跟数据包长度2个字节(固定2400字节,不固定长度也可以参照),随后是数据区。在设备连接成功之后,数据主动从设备发送到PC。

关键代码

虽然是.NET Core平台的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安装,直接

install-package system.io.pipelines

进行安装就可以了。Socket相关处理的代码不再写了,只列关键的。

代码第一步是声明pipe。

private async void InitPipe(Socket socket)
{
    Pipe pipe = new Pipe();
    Task writing = FillPipeAsync(socket,pipe.Writer);
    Task reading = ReadPipeAsync(socket,pipe.Reader);

    await Task.WhenAll(reading,writing);
}

pipe有reader还有一个writer,reader负责读取pipe数据,主要用在数据处理循环,writer负责将数据写入pipe,主要用在数据接受循环。

//写入循环
private async Task FillPipeAsync(Socket socket,PipeWriter writer)
{
    //数据流量比较大,用1M字节作为buffer
    const int minimumBufferSize = 1024 * 1024;

    while (running)
    {
        try
        {
            //从writer中,获得一段不少于指定大小的内存空间
            Memory<byte> memory = writer.GetMemory(minimumBufferSize);

            //将内存空间变成ArraySegment,提供给socket使用
            if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory,out ArraySegment<byte> arraySegment))
            {
                throw new InvalidOperationException("Buffer backed by array was expected");
            }
            //接受数据
            int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket,arraySegment,SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }

            //一次接受完毕,数据已经在pipe中,告诉pipe已经给它写了多少数据。
            writer.Advance(bytesRead);
        }
        catch
        {
            break;
        }

        // 提示reader可以进行读取数据,reader可以继续执行readAsync()方法
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // 告诉pipe完事了
    writer.Complete();
}

//读取循环
private async Task ReadPipeAsync(Socket socket,PipeReader reader)
{
    while (running)
    {
        //等待writer写数据
        ReadResult result = await reader.ReadAsync();
        //获得内存区域
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;

        do
        {
            //寻找head的第一个字节所在的位置
            position = buffer.PositionOf((byte)0x75);
            if (position != null)
            {
                //由于是连续四个字节作为head,需要进行比对,我这里直接使用了ToArray方法,还是有了内存拷贝动作,不是很理想,但是写起来很方便。
                //对性能有更高要求的场景,可以进行slice操作后的单独比对,这样不需要内存拷贝动作
                var headtoCheck = buffer.Slice(position.Value,4).ToArray();
                //SequenceEqual需要引用System.Linq
                if (headtoCheck.SequenceEqual(new byte[] { 0x75,0x97 }))
                {
                    //到这里,认为找到包开头了(从position.value开始),接下来需要从开头处截取整包的长度,需要先判断长度是否足够
                    if (buffer.Slice(position.Value).Length >= 2400)
                    {
                        //长度足够,那么取出ReadOnlySequence,进行操作
                        var mes = buffer.Slice(position.Value,2400);
                        //这里是数据处理的函数,可以参考官方文档对ReadOnlySequence进行操作,文档里面使用了span,那样性能会好一些。我这里简单实用ToArray()操作,这样也有了内存拷贝的问题,但是处理的直接是byte数组了。
                        await ProcessMessage(mes.ToArray());
                        //这一段就算是完成了,从开头位置,一整个包的长度就算完成了
                        var next = buffer.GetPosition(2400,position.Value);
                        //将buffer处理过的舍弃,替换为剩余的buffer引用
                        buffer = buffer.Slice(next);
                    }
                    else
                    {
                        //长度不够,说明数据包不完整,等下一波数据进来再拼接,跳出循环。
                        break;
                    }
                }
                else
                {
                    //第一个是0x75但是后面不匹配,可能有数据传输问题,那么需要舍弃第一个,0x75后面的字节开始再重新找0x75
                    var next = buffer.GetPosition(1,position.Value);
                    buffer = buffer.Slice(next);
                }
            }
        }
        while (position != null);

        //数据处理完毕,告诉pipe还剩下多少数据没有处理(数据包不完整的数据,找不到head)
        reader.AdvanceTo(buffer.Start,buffer.End);

        if (result.IsCompleted)
        {
            break;
        }
    }

    reader.Complete();
}

以上代码基本解决了以下问题:

  • 数据接收不完整,找不到开头结尾,导致数据大量丢弃,或者自己维护一个queue的代码复杂性
  • 数据接收与处理的同步问题
  • 一次性收到多条数据的情况

后记

本文只是解释了pipeline处理的模式,对于茫茫多的ToArray方法,可以使用基于Span的操作进行优化(有时间就来填坑)。另外,如果在await ProcessMessage(mes.ToArray());这里,直接使用Task.Run(()=>ProcessMessage(mes);代替的话,实测会出现莫名其妙的问题,很有可能是pipe运行快,在系统调度Task之前,已经将内存释放导致的,如果需要优化这一块的话,需要格外注意。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


引言 本文从Linux小白的视角, 在CentOS 7.x服务器上搭建一个Nginx-Powered AspNet Core Web准生产应用。 在开始之前,我们还是重温一下部署原理,正如你所常见的.Net Core 部署图: 在Linux上部署.Net Core App最好的方式是在Linux机器
引言: 多线程编程/异步编程非常复杂,有很多概念和工具需要去学习,贴心的.NET提供Task线程包装类和await/async异步编程语法糖简化了异步编程方式。 相信很多开发者都看到如下异步编程实践原则: 遵守以上冷冰冰的②③条的原则,可保证异步程序按照预期状态正常运作;我们在各大编程论坛常看到违背
一. 宏观概念 ASP.NET Core Middleware是在应用程序处理管道pipeline中用于处理请求和操作响应的组件。 每个组件是pipeline 中的一环。 自行决定是否将请求传递给下一个组件 在处理管道的下个组件执行之前和之后执行业务逻辑 二. 特性和行为 ASP.NET Core处
背景 在.Net和C#中运行异步代码相当简单,因为我们有时候需要取消正在进行的异步操作,通过本文,可以掌握 通过CancellationToken取消任务(包括non-cancellable任务)。 Task&#160;表示无返回值的异步操作, 泛型版本Task&lt;TResult&gt;表示有返
HTTP基本认证 在HTTP中,HTTP基本认证(Basic Authentication)是一种允许网页浏览器或其他客户端程序以(用户名:口令) 请求资源的身份验证方式,不要求cookie,session identifier、login page等标记或载体。 - 所有浏览器据支持HTTP基本认
1.Linq 执行多列排序 OrderBy的意义是按照指定顺序排序,连续两次OrderBy,后面一个有可能会打乱前面一个的排序顺序,可能与预期不符。 要实现sql中的order by word,name类似效果; LINQ 有ThenBy可以紧接使用, ThenBy记住原本排序的值,然后再排其他值,
ASP.NET Core 核心特性:开源、跨平台、高性能是其决战JAVA的必胜法宝,最引人关注的跨平台特性 到底是怎么实现? &#xA; 本文分Unix、Windows剖析跨平台内幕,读完让你大呼过瘾。
前导 Asynchronous programming Model(APM)异步编程模型以BeginMethod(...) 和 EndMethod(...)结对出现。 IAsyncResult BeginGetResponse(AsyncCallback callback, object state
引言 最近在公司开发了一个项目,项目部署架构图如下: 思路 如图中文本所述,公司大数据集群不允许直接访问外网,需要一个网关服务器代理请求,本处服务器A就是边缘代理服务器的作用。 通常技术人员最快捷的思路是在服务器A上部署IISʺpplication Request Routing Module组件
作为一枚后端程序狗,项目实践常遇到定时任务的工作,最容易想到的的思路就是利用Windows计划任务/wndows service程序/Crontab程序等主机方法在主机上部署定时任务程序/脚本。 但是很多时候,若使用的是共享主机或者受控主机,这些主机不允许你私自安装exe程序、Windows服务程序
引言 熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避: 在单体程序部署的瞬间(服务不可用)会有少量流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流
合格的web后端程序员,除搬砖技能,还必须会给各种web服务器配置Https,本文结合ASP.NET Core部署模型聊一聊启用Https的方式。 温故知新 目前常见的Http请求明文传输,请求可能被篡改,访问的站点可能被伪造。 HTTPS是HTTP加上TLS/SSL协议构建的可进行加密传输、身份认
长话短说 前文《解剖HttpClientFactory,自由扩展HttpMessageHandler》主要讲如何为HttpClientFactory自定义HttpMessageHandler组件, 现在来完成课后的小作业: 将重点日志字段显示到Nlog的LayoutRenderer上。 本文实现一个
引言问题 作为资深老鸟,有事没事,出去面试;找准差距、定位价值。 面试必谈哈希, Q1:什么是哈希? Q2:哈希为什么快? Q3:你是怎么理解哈希算法利用空间换取时间的? Q4:你是怎么解决哈希冲突的? Q5:你有实际用写过哈希算法吗? 知识储备 哈希(也叫散列)是一种查找算法(可用于插入),哈希算
前言 如题,有感于博客园最近多次翻车,感觉像胡子眉毛一把抓, 定位不了生产环境的问题。 抛开流程问题,思考在生产环境中如何做故障排除,&#160;发现博客园里面这方面的文章比较少。 .Net 本身是提供了sos.dll工具帮助我们在生产中故障排除,通过提供有关内部公共语言运行时(CLR)环境的信息,
.NET程序是基于.NET Framework、.NET Core、Mono、【.NET实现】开发和运行的 ,定义以上【.NET实现】的标准规范称为.NET Standard .NET Standard .NET标准是一组API集合,由上层三种【.NET实现】的Basic Class Library
长话短说 上个月公司上线了一个物联网数据科学项目,我主要负责前端接受物联网事件,并提供 参数下载。 webapp 部署在Azure云上,参数使用Azure SQL Server存储。 最近从灰度测试转向全量部署之后,日志时常收到: SQL Session超限报错。 排查 我在Azure上使用的是 S
临近年关,搜狗,360浏览器出现页面无法成功跳转,同域Cookie丢失? 也许是服务端 SameSite惹的祸。&#xA;本文揭示由于Chrome低版本内核不识别 SameSite= None, 引发的单点登录故障。
本文聊一聊TraceID的作用和一般组成,衍生出ASP. NETCore 单体和分布式程序中 TraceId 的使用方式
通过给 HttpClint请求的日志增加 TraceId,解锁自定义扩展 HttpClientFacroty 的姿势