C#数据结构-线程安全队列

什么是线程安全?

答:线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。

前面几篇写的线性结构,在多线程并行的情况下会出现共享数据会线程间读取与写入不一直的情况,为了解决这种情况,通常会使用锁来解决,也就是将并行改为串行。但是在使用穿行违背了使用多线程并发的初衷,这种情况下我们可以考虑采用线程安全结构。

先看下线程安全队列的用法:

ConcurrentQueue<int> ts = new  System.Collections.Concurrent.ConcurrentQueue<int>();
ts.Enqueue(1);
ts.Enqueue(234);
foreach (var r in ts)
{
    Console.Write($"data:{r} ");
}
Console.WriteLine();
ts.TryPeek(out int pk);
Console.WriteLine($peek:{pk});
ts.TryDequeue( ck);
ts.Enqueue(56);
Console.WriteLine();
);
}
Console.WriteLine();
Console.ReadLine();

现在我们看下线程安全队列的实现方式:(参考自:.net framework 4.8),核心代码全部做了注释。

 

总的来说,(总结语放到前面,防止代码篇幅太大,同志们没有耐心翻到最底下~)

1、线程安全队列通过SpinWait自旋类来实现等待并行线程完成与Interlocked原子操作类计数实现的。

2、线程安全队列通过单向链表实现的,链的节点为长度32的数组,通过记录链的头节点与尾节点、以及队列的头尾实现队列的存储与入队、出队操作的。

 

 

public class MyConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    [NonSerialized]
    private volatile Segment m_head;

    [NonSerialized]
     Segment m_tail;

    private T[] m_serializationArray;

    const int SEGMENT_SIZE = 32;

    [NonSerialized]
    internal volatile int m_numSnapshotTakers = 0;
    /// <summary>
    /// 链尾部节点
    </summary>
    public MyConcurrentQueue()
    {
        m_head = m_tail = new Segment(0,this);
    }
    //尝试添加
    bool IProducerConsumerCollection<T>.TryAdd(T item)
    {
        Enqueue(item);
        return true;
    }
     尝试从中移除并返回对象
    </summary>
    <param name="item">
    </remarks>
    bool IProducerConsumerCollection<T>.TryTake(out T item)
    {
        return TryDequeue( item);
    }
     判断当前链是否为空
    bool IsEmpty
    {
        get
        {
            Segment head = m_head;
            if (!head.IsEmpty)
                如果头不为空,则链非空
                false;
            else if (head.Next == null)
                如果头节点的下一个节点为空,且为链尾,
                else
            如果头节点为空且不是最后一个节点 ,则标识另一个线程正在写入该数组
            等待中..
            {
                SpinWait spin = new SpinWait();
                while (head.IsEmpty)
                {
                    此时为空
                    )
                        ;
                    否则标识正在有线程占用写入
                    线程循环一次
                    spin.SpinOnce();
                    head = m_head;
                }
                ;
            }
        }
    }
     用来判断链是否在变化
    <param name="head"></param>
    <param name="tail"></param>
    <param name="headLow"></param>
    <param name="tailHigh"></param>
    void GetHeadTailPositions(out Segment head,1)"> Segment tail,int headLow,1)"> tailHigh)
    {
        head = m_head;
        tail = m_tail;
        headLow = head.Low;
        tailHigh = tail.High;
        SpinWait spin =  SpinWait();
        Console.WriteLine($head.Low:{head.Low},tail.High:{tail.High},head.m_index:{head.m_index},tail.m_index:{tail.m_index});
        通过循环来保证值不再更改(也就是说并行线程操作结束)
        保证线程串行核心的判断逻辑
         (
            头尾发生变化
            head != m_head || tail != m_tail
            如果队列头、尾索引发生变化
            || headLow != head.Low || tailHigh != tail.High
            || head.m_index > tail.m_index)
        {
            spin.SpinOnce();
            head = m_head;
            tail = m_tail;
            headLow = head.Low;
            tailHigh = tail.High;
        }
    }
     获取总数
     Count
    {
        
        {
            Segment head,tail;
             headLow,tailHigh;
            GetHeadTailPositions(out head,1)">out tail,1)">out headLow,1)"> tailHigh);
            if (head == tail)
            {
                return tailHigh - headLow + ;
            }
            头节点长度
            int count = SEGMENT_SIZE - headLow;
            加上中间其他节点长度
            count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - ));
            加上尾节点长度
            count += tailHigh + return count;
        }
    }

    object SyncRoot => throw  NotImplementedException();

    bool IsSynchronized => void CopyTo(T[] array,1)"> index)
    {
        
    }
     暂未实现
    <returns></returns>
    public IEnumerator<T> GetEnumerator()
    {
         添加
    <param name="item"></param>
    void Enqueue(T item)
    {
        SpinWait spin =  SpinWait();
        while ()
        {
            Segment tail = m_tail;
            if (tail.TryAppend(item))
                ;
            spin.SpinOnce();
        }
    }
     尝试删除节点
    <param name="result"></param>
    bool TryDequeue( T result)
    {
        while (!IsEmpty)
        {
            Segment head =if (head.TryRemove( result))
                ;
        }
        result = default(T);
         查看最后一个添加入的元素
    bool TryPeek(原子增加值
        Interlocked.Increment(ref m_numSnapshotTakers);


        IsEmpty)
        {
            首先从头节点看一下第一个节点是否存在
            Segment head =if (head.TryPeek( result))
            {
                Interlocked.Decrement( m_numSnapshotTakers);
                ;
            }
        }
        result = (T);
        Interlocked.Decrement( m_numSnapshotTakers);
        ;
    }

    void CopyTo(Array array,1)"> index)
    {
         NotImplementedException();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
         NotImplementedException();
    }

     T[] ToArray()
    {
         NotImplementedException();
    }
     为线程安全队列提供一个 单向链表,
     链表的每个节点存储长度为32的数组
    class Segment
    {
        <summary>
         定义一个数组,用于存储每个节点的内容
        </summary>
         T[] m_array;
         定义一个结构数组,用于标识数组中每个节点是否有效(是否存储内容)
         VolatileBool[] m_state;
        指针,指向下一个节点数组
        如果是最后一个节点,则节点为空
         Segment m_next;
         索引,用来存储链表的长度
        readonly long m_index;
         用来标识队列头-数组弹出索引
         m_low;
         用来标识队列尾-数组最新存储位置
         m_high;
         用来标识队列
        volatile MyConcurrentQueue<T> m_source;
         实例化链节点
        internal Segment(long index,MyConcurrentQueue<T> source)
        {
            m_array =  T[SEGMENT_SIZE];
            m_state = new VolatileBool[SEGMENT_SIZE]; all initialized to false
            m_high = -;
            m_index = index;
            m_source = source;
        }
         链表的下一个节点
        internal Segment Next
        {
            get {  m_next; }
        }
         如果当前节点数组为空返回true,
         IsEmpty
        {
            return (Low > High); }
        }
         非安全添加方法(无判断数组长度)
        </summary>
        <param name="value"></param>
         UnsafeAdd(T value)
        {
            m_high++;
            m_array[m_high] = value;
            m_state[m_high].m_value = ;
        }


         Segment UnsafeGrow()
        {
            Segment newSegment = new Segment(m_index + ,m_source);
            m_next = newSegment;
             newSegment;
        }


         如果当前数组满了 >=32,则链扩展节点。
         Grow()
        {
            重新船舰数组
            Segment newSegment = 赋值给next指针
            m_next =将节点添加到链
            m_source.m_tail = m_next;
        }

         在末尾添加元素
        <param name="value">元素</param>
        <param name="tail">The tail.<returns>如果附加元素,则为true;如果当前数组已满,则为false</returns>
        <remarks>如果附加指定的元素成功,并且在此之后数组满了,在链上添加新节点(节点为32长度数组)    </remarks>
         TryAppend(T value)
        {
            如果数组已满则跳出方法
            if (m_high >= SEGMENT_SIZE - )
            {
                局部变量初始化
            int newhigh = SEGMENT_SIZE;
            try
            { }
            finally
            {
                原子递增
                newhigh = Interlocked.Increment( m_high);
                if (newhigh <= SEGMENT_SIZE - )
                {
                    m_array[newhigh] = value;
                    m_state[newhigh].m_value = ;
                }
                如果数组满了,则扩展链节点。
                if (newhigh == SEGMENT_SIZE - )
                {
                    Grow();
                }
            }
            如果 newhigh <= SEGMENT_SIZE-1,这意味着当前线程成功地占据了一个位置
            return newhigh <= SEGMENT_SIZE - ;
        }

         尝试从链的头部数组删除节点
        <param name="result"></param>
        <returns></returns>
        bool TryRemove( T result)
        {
            SpinWait spin =  SpinWait();
            int lowLocal = Low,highLocal = High;
            while (lowLocal <= highLocal)
            {
                获取队头索引
                if (Interlocked.CompareExchange(ref m_low,lowLocal + 1,lowLocal) == lowLocal)
                {
                    如果要弹出队列的值不可用,说明这个位置被并行线程获取到了权限,但是值还未写入。
                    通过线程自旋等待值写入
                    SpinWait spinLocal =  SpinWait();
                    m_state[lowLocal].m_value)
                    {
                        spinLocal.SpinOnce();
                    }
                    取出值
                    result = m_array[lowLocal];
                     如果没有其他线程读取(GetEnumerator()、ToList()) 执行删除
                     如 TryPeek 的时候m_numSnapshotTakers会在进入方法体时++,在出方法体--
                     清空该索引下的值
                    if (m_source.m_numSnapshotTakers <= )
                        m_array[lowLocal] = (T);
                    如果说lowLocal+1 = 32 说明当前链节点的数组已经全部出队
                    if (lowLocal + 1 >= SEGMENT_SIZE)
                    {
                        由于lowLocal <= highLocal成立
                        lowLocal + 1 >= SEGMENT_SIZE 如果成立,且m_next == null 成立,
                        说明在此时有其他线程正在做扩展链结构
                        那么当前线程需要等待其他线程完成扩展链表,再做出队操作。
                        spinLocal =  SpinWait();
                        while (m_next == )
                        {
                            spinLocal.SpinOnce();
                        }
                        m_source.m_head = m_next;
                    }
                    else
                {
                    此时说明 当前线程竞争资源失败,做短暂自旋后继续竞争资源
                    spin.SpinOnce();
                    lowLocal = Low; highLocal = High;
                }
            }
            失败的情况下返回空值
            result = (T);
            ;
        }
         尝试获取队列头节点元素
         T result)
        {
            result = int lowLocal = Low;
            校验当前队列是否正确
            if (lowLocal > High)
                ;
            SpinWait spin = 如果头节点无效,则说明当前节点被其他线程占用,并在做写入操作,
            需要等待其他线程写入后再执行读取操作
            m_state[lowLocal].m_value)
            {
                spin.SpinOnce();
            }
            result = m_array[lowLocal];
             返回队列首位置
         Low
        {
             Math.Min(m_low,SEGMENT_SIZE);
            }
        }
         获取队列长度    
         High
        {
            如果m_high>SEGMENT_SIZE,则表示超出范围,我们应该返回 SEGMENT_SIZE-1
                return Math.Min(m_high,SEGMENT_SIZE - );
            }
        }
    }
}
<summary>
 结构-用来存储整数组每个索引上是否存储值
</summary>
struct VolatileBool
{
    public VolatileBool( value)
    {
        m_value = value;
    }
     m_value;
}

代码通篇看下来有些长(已经精简了很多,只实现入队、出队、与查看下一个出队的值),不知道有多少人能翻到这里~

说明:

1、TryAppend方法通过Interlocked.Increment()原子递增方法获取下一个数组存储点,通过比对32判断链是否需要增加下一个链节点,也就是说,链的存储空间每次扩展为32个存储位置。

2、TryRemove方法通过 Interlocked.CompareExchange()方法来判断当前是否有并行线程在写入,如果有则通过 while循环 SpinWait类的SpinOnce()方法实现等待写入完成后,再做删除;特别说明,判断是否写入是靠VolatileBool结构来实现的,每个链表的每个节点在存储值的同时每个存储都对应一个VolatileBool结构用来标识当前写入点是否成功写入。特殊情况,如果当前链节点的数组已经空了,则需要pinWait类的SpinOnce()简短的自旋等待并行的写入方法完成扩展链后,再做删除。

3、TryPeek方法,同样会判断要获取的元素是否已经成功写入(不成功则说明并行线程还未完成写入),如果未完成,则通过 while pinWait类的SpinOnce()来等待写入完成后,再读取元素内容。

现在代码已经看完了,来试下:

MyConcurrentQueue<string> myConcurrentQueue = new MyConcurrentQueue<string>();
for (int i = 0; i < 67; i++)
{
    myConcurrentQueue.Enqueue($第{i}位);
    Console.WriteLine($总数:{myConcurrentQueue.Count});
}

myConcurrentQueue.TryPeek(string rs);
Console.WriteLine($TryPeek 总数:{myConcurrentQueue.Count}34; i++)
{
    myConcurrentQueue.TryDequeue( result0);
    Console.WriteLine($TryDequeue 总数:{myConcurrentQueue.Count});
}
Console.ReadKey();

打印:

head.Low:0,head.m_index:0,tail.m_index:
总数:
head.Low:1,head.m_index:2,head.m_index:3,head.m_index:4,head.m_index:5,head.m_index:6,head.m_index:77,head.m_index:88,head.m_index:99,head.m_index:1010,head.m_index:1111,head.m_index:1212,head.m_index:1313,head.m_index:1414,head.m_index:1515,head.m_index:1616,head.m_index:1717,head.m_index:1818,head.m_index:1919,head.m_index:2020,head.m_index:2121,head.m_index:2222,head.m_index:2323,head.m_index:2424,head.m_index:2525,head.m_index:2626,head.m_index:2727,head.m_index:2828,head.m_index:2929,head.m_index:3030,head.m_index:313334353637383940414243444546474849505152535455565758596061626364656667
TryPeek 总数:
TryDequeue 总数:2,1)">3,1)">4,1)">5,1)">6,1)">7,1)">8,1)">9,1)">10,1)">11,1)">12,1)">13,1)">14,1)">15,1)">16,1)">17,1)">18,1)">19,1)">20,1)">21,1)">22,1)">23,1)">24,1)">25,1)">26,1)">27,1)">28,1)">29,1)">30,1)">31,1)">1,tail.m_index:33

有时间希望大家能将代码跑一下,相信会更明白其中的原理。

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


引言 本文从Linux小白的视角, 在CentOS 7.x服务器上搭建一个Nginx-Powered AspNet Core Web准生产应用。 在开始之前,我们还是重温一下部署原理,正如你所常见的.Net Core 部署图: 在Linux上部署.Net Core App最好的方式是在Linux机器
引言: 多线程编程/异步编程非常复杂,有很多概念和工具需要去学习,贴心的.NET提供Task线程包装类和await/async异步编程语法糖简化了异步编程方式。 相信很多开发者都看到如下异步编程实践原则: 遵守以上冷冰冰的②③条的原则,可保证异步程序按照预期状态正常运作;我们在各大编程论坛常看到违背
一. 宏观概念 ASP.NET Core Middleware是在应用程序处理管道pipeline中用于处理请求和操作响应的组件。 每个组件是pipeline 中的一环。 自行决定是否将请求传递给下一个组件 在处理管道的下个组件执行之前和之后执行业务逻辑 二. 特性和行为 ASP.NET Core处
背景 在.Net和C#中运行异步代码相当简单,因为我们有时候需要取消正在进行的异步操作,通过本文,可以掌握 通过CancellationToken取消任务(包括non-cancellable任务)。 Task&#160;表示无返回值的异步操作, 泛型版本Task&lt;TResult&gt;表示有返
HTTP基本认证 在HTTP中,HTTP基本认证(Basic Authentication)是一种允许网页浏览器或其他客户端程序以(用户名:口令) 请求资源的身份验证方式,不要求cookie,session identifier、login page等标记或载体。 - 所有浏览器据支持HTTP基本认
1.Linq 执行多列排序 OrderBy的意义是按照指定顺序排序,连续两次OrderBy,后面一个有可能会打乱前面一个的排序顺序,可能与预期不符。 要实现sql中的order by word,name类似效果; LINQ 有ThenBy可以紧接使用, ThenBy记住原本排序的值,然后再排其他值,
ASP.NET Core 核心特性:开源、跨平台、高性能是其决战JAVA的必胜法宝,最引人关注的跨平台特性 到底是怎么实现? &#xA; 本文分Unix、Windows剖析跨平台内幕,读完让你大呼过瘾。
前导 Asynchronous programming Model(APM)异步编程模型以BeginMethod(...) 和 EndMethod(...)结对出现。 IAsyncResult BeginGetResponse(AsyncCallback callback, object state
引言 最近在公司开发了一个项目,项目部署架构图如下: 思路 如图中文本所述,公司大数据集群不允许直接访问外网,需要一个网关服务器代理请求,本处服务器A就是边缘代理服务器的作用。 通常技术人员最快捷的思路是在服务器A上部署IISʺpplication Request Routing Module组件
作为一枚后端程序狗,项目实践常遇到定时任务的工作,最容易想到的的思路就是利用Windows计划任务/wndows service程序/Crontab程序等主机方法在主机上部署定时任务程序/脚本。 但是很多时候,若使用的是共享主机或者受控主机,这些主机不允许你私自安装exe程序、Windows服务程序
引言 熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避: 在单体程序部署的瞬间(服务不可用)会有少量流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流
合格的web后端程序员,除搬砖技能,还必须会给各种web服务器配置Https,本文结合ASP.NET Core部署模型聊一聊启用Https的方式。 温故知新 目前常见的Http请求明文传输,请求可能被篡改,访问的站点可能被伪造。 HTTPS是HTTP加上TLS/SSL协议构建的可进行加密传输、身份认
长话短说 前文《解剖HttpClientFactory,自由扩展HttpMessageHandler》主要讲如何为HttpClientFactory自定义HttpMessageHandler组件, 现在来完成课后的小作业: 将重点日志字段显示到Nlog的LayoutRenderer上。 本文实现一个
引言问题 作为资深老鸟,有事没事,出去面试;找准差距、定位价值。 面试必谈哈希, Q1:什么是哈希? Q2:哈希为什么快? Q3:你是怎么理解哈希算法利用空间换取时间的? Q4:你是怎么解决哈希冲突的? Q5:你有实际用写过哈希算法吗? 知识储备 哈希(也叫散列)是一种查找算法(可用于插入),哈希算
前言 如题,有感于博客园最近多次翻车,感觉像胡子眉毛一把抓, 定位不了生产环境的问题。 抛开流程问题,思考在生产环境中如何做故障排除,&#160;发现博客园里面这方面的文章比较少。 .Net 本身是提供了sos.dll工具帮助我们在生产中故障排除,通过提供有关内部公共语言运行时(CLR)环境的信息,
.NET程序是基于.NET Framework、.NET Core、Mono、【.NET实现】开发和运行的 ,定义以上【.NET实现】的标准规范称为.NET Standard .NET Standard .NET标准是一组API集合,由上层三种【.NET实现】的Basic Class Library
长话短说 上个月公司上线了一个物联网数据科学项目,我主要负责前端接受物联网事件,并提供 参数下载。 webapp 部署在Azure云上,参数使用Azure SQL Server存储。 最近从灰度测试转向全量部署之后,日志时常收到: SQL Session超限报错。 排查 我在Azure上使用的是 S
临近年关,搜狗,360浏览器出现页面无法成功跳转,同域Cookie丢失? 也许是服务端 SameSite惹的祸。&#xA;本文揭示由于Chrome低版本内核不识别 SameSite= None, 引发的单点登录故障。
本文聊一聊TraceID的作用和一般组成,衍生出ASP. NETCore 单体和分布式程序中 TraceId 的使用方式
通过给 HttpClint请求的日志增加 TraceId,解锁自定义扩展 HttpClientFacroty 的姿势