统一流控服务开源:基于.Net Core的流控服务 统一流控服务开源-1:场景&业界做法&算法篇

先前有一篇博文,梳理了流控服务的场景、业界做法和常用算法

统一流控服务开源-1:场景&业界做法&算法篇

最近完成了流控服务的开发,并在生产系统进行了大半年的验证,稳定可靠。今天整理一下核心设计和实现思路,开源到Github上,分享给大家

     https://github.com/zhouguoqing/FlowControl

 一、令牌桶算法实现

  先回顾一下令牌桶算法示意图

  

  

  随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms) 往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),

 

  如果桶已经满了就不再加了. 新请求来临时, 会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

 

  令牌添加速度支持动态变化,实时控制处理的速率.

  令牌桶有两个关键的属性:令牌桶容量(大小)和时间间隔,

  有两个关键操作,从令牌桶中取Token;令牌桶定时的Reset重置。

  我们看TokenBucket类:

using System;

namespace CZ.FlowControl.Service
{
     CZ.FlowControl.Spi;
    /// <summary>
    /// 令牌桶
    </summary>
    public abstract class TokenBucket : IThrottleStrategy
    {
        protected long bucketTokenCapacity;
        private static readonly object syncRoot = new object();
         ticksRefillInterval;
         nextRefillTime;

        //number of tokens in the bucket
         tokens;

        protected TokenBucket(long bucketTokenCapacity,long refillInterval,1)"> refillIntervalInMilliSeconds)
        {
            if (bucketTokenCapacity <= 0)
                throw new ArgumentOutOfRangeException("bucketTokenCapacity",bucket token capacity can not be negative");
            if (refillInterval < refillIntervalRefill interval cannot be negativeif (refillIntervalInMilliSeconds <= refillIntervalInMilliSecondsRefill interval in milliseconds cannot be negative);

            this.bucketTokenCapacity = bucketTokenCapacity;
            ticksRefillInterval = TimeSpan.FromMilliseconds(refillInterval * refillIntervalInMilliSeconds).Ticks;
        }

        <summary>
         是否流控
        </summary>
        <param name="n"></param>
        <returns></returns>
        bool ShouldThrottle(long n = 1)
        {
            TimeSpan waitTime;
            return ShouldThrottle(n,1)">out waitTime);
        }
        bool ShouldThrottle(long n,1)"> TimeSpan waitTime)
        {
            if (n <= 0) nShould be positive integerlock (syncRoot)
            {
                UpdateTokens();
                if (tokens < n)
                {
                    var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;
                    if (timeToIntervalEnd <  waitTime);

                    waitTime = TimeSpan.FromTicks(timeToIntervalEnd);
                    return true;
                }
                tokens -= n;

                waitTime = TimeSpan.Zero;
                false;
            }
        }

         更新令牌
        </summary>
        void UpdateTokens();

        return ShouldThrottle(1,1)"> waitTime);
        }

         CurrentTokenCount
        {
            get
            {
                 (syncRoot)
                {
                    UpdateTokens();
                    return tokens;
                }
            }
        }
    }
}

 这个抽象类中,将UpdateToken作为抽象方法暴露出来,给实现类更多的灵活去控制令牌桶重置操作。基于此实现了“固定令牌桶”FixedTokenBucket

    ///  固定令牌桶
     FixedTokenBucket : TokenBucket
    {
        public FixedTokenBucket(long maxTokens,1)"> refillIntervalInMilliSeconds)
            : base(maxTokens,refillInterval,refillIntervalInMilliSeconds)
        {
        }

        override  UpdateTokens()
        {
            var currentTime = SystemTime.UtcNow.Ticks;

            if (currentTime < nextRefillTime)
                ;

            tokens = bucketTokenCapacity;
            nextRefillTime = currentTime + ticksRefillInterval;
        }
    }

   固定令牌桶在每次取Token时,都要执行方法ShouldThrottle。这个方法中:

   并发取Token是线程安全的,这个地方用了Lock控制,损失了一部分性能。同时每次获取可用Token的时候,都会实时Check一下是否需要到达Reset令牌桶的时间。

   获取到可用令牌后,令牌桶中令牌的数量-1。如果没有足够的可用令牌,则返回等待到下次Reset令牌桶的时间。如下代码:

        );

            lock (syncRoot)
            {
                UpdateTokens();
                ;
                }
                tokens -=;
            }
        }

   以上就是令牌桶算法的实现。我们继续看漏桶算法。

 二、漏桶算法实现

  首先回顾一下漏桶算法的原理:

  

  

  水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),

 

  当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,

 

  可以看出漏桶算法能强行限制数据的传输速率.

 

  有两个变量:

 

  • 一个是桶的大小,支持流量突发增多时可以存多少的水(burst),
  • 另一个是水桶漏洞的大小(rate)。

 

   漏桶抽象类:LeakTokenBucket,继承与令牌桶抽象父类 TokenBucket,说明了获取令牌(漏出令牌)在底层的方式是一致的,不一样的是重置令牌的方式(务必理解这一点)

 CZ.FlowControl.Service
{
     漏桶
     LeakyTokenBucket : TokenBucket
    {
         stepTokens;
         ticksStepInterval;

        protected LeakyTokenBucket(int refillIntervalInMilliSeconds,long stepTokens,1)">long stepInterval,1)"> stepIntervalInMilliseconds)
            : this.stepTokens = stepTokens;
            if (stepInterval < stepIntervalStep interval cannot be negativeif (stepTokens < stepTokensStep tokens cannot be negativeif (stepIntervalInMilliseconds <= stepIntervalInMillisecondsStep interval in milliseconds cannot be negative);

            ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalInMilliseconds).Ticks;
        }
    }
}

    可以看出,漏桶是在令牌桶的基础上增加了二个重要的属性:这两个属性决定了重置令牌桶的方式

    stepTokens:每间隔时间内漏的数量

    ticksStepInterval:漏的间隔时间

    举个例子:TPS 100,即每秒漏出100个Token,stepTokens =100, ticksStepInterval=1000ms

    漏桶的具体实现有两种:空桶和满桶

    StepDownTokenBucket 满桶:即一把将令牌桶填充满

 漏桶(满桶)
    </summary>
    <remarks>
     StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval
    </remarks>
     StepDownTokenBucket : LeakyTokenBucket
    {
        public StepDownTokenBucket(int refillIntervalInMilliSeconds,1)">int stepIntervalInMilliseconds) : if (currentTime >= nextRefillTime)
            {
                set tokens to max
                tokens = bucketTokenCapacity;

                compute next refill time
                nextRefillTime = currentTime + ticksRefillInterval;
                ;
            }

            calculate max tokens possible till the end
            var timeToNextRefill = nextRefillTime - currentTime;
            var stepsToNextRefill = timeToNextRefill/ticksStepInterval;

            var maxPossibleTokens = stepsToNextRefill*stepTokens;

            if ((timeToNextRefill%ticksStepInterval) > 0) maxPossibleTokens +=if (maxPossibleTokens < tokens) tokens = maxPossibleTokens;
        }
    }
}
View Code

   StepUpLeakyTokenBucket 空桶:即每次只将stepTokens个数的令牌放到桶中   

 1  System;
 2 
 3  CZ.FlowControl.Service
 4 {
 5     <summary>
 6      漏桶(空桶)
 7     </summary>
 8     <remarks>
 9       StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval.
10     </remarks>
11      StepUpLeakyTokenBucket : LeakyTokenBucket
12     {
13          lastActivityTime;
14 
15         public StepUpLeakyTokenBucket( stepIntervalInMilliseconds) 
16             : 17         {
18         }
19 
20          UpdateTokens()
21 22              SystemTime.UtcNow.Ticks;
23 
24              nextRefillTime)
25             {
26                 tokens = stepTokens;
27 
28                 lastActivityTime = currentTime;
29                 nextRefillTime = currentTime + ticksRefillInterval;
30 
31                 ;
32             }
33 
34             calculate tokens at current step
35 
36             long elapsedTimeSinceLastActivity = currentTime -37             long elapsedStepsSinceLastActivity = elapsedTimeSinceLastActivity / ticksStepInterval;
38 
39             tokens += (elapsedStepsSinceLastActivity*stepTokens);
40 
41             if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;
42             lastActivityTime =43 44     }
45 }
View Code

 三、流控服务封装

  第二章节,详细介绍了令牌桶和漏桶的具体实现。基于以上,要重点介绍接口:IThrottleStrategy:流控的具体方式

 CZ.FlowControl.Spi
{
     流量控制算法策略
    interface IThrottleStrategy
    {
        );

        <param name="waitTime"></param>
         TimeSpan waitTime);

         当前令牌个数
        long CurrentTokenCount { ; }
    }
}

    有了这个流控方式接口后,我们还需要一个流控策略定义类:FlowControlStrategy

    即定义具体的流控策略:以下是这个类的详细属性和成员:  不仅定义了流控策略类型,还定义了流控的维度信息和流控阈值,这样流控就做成依赖注入的方式了! 

 System.Collections.Generic;
 System.Text;

 流控策略
     FlowControlStrategy
    {
         标识
        string ID { get; set; }

         名称
        string Name {  流控策略类型
        public FlowControlStrategyType StrategyType {  流控阈值-Int
        long IntThreshold {  流控阈值-Double
        decimal DoubleThreshold {  时间区间跨度
        public FlowControlTimespan TimeSpan { ; }

        private Dictionary<string,1)">string> flowControlConfigs;

         流控维度信息
        public Dictionary< FlowControlConfigs
        {
            if (flowControlConfigs == null)
                    flowControlConfigs = new Dictionary<();

                 flowControlConfigs;
            }
            
            {
                flowControlConfigs = value;
            }
        }

         描述
        string Descriptions {  触发流控后是否直接拒绝请求
        </summary>        
        bool IsRefusedRequest {  创建时间
        public DateTime CreateTime {  创建人
        string Creator {  最后修改时间
        public DateTime LastModifyTime {  最后修改人
        string LastModifier { ; }
    }
}

   同时,流控策略类型,我们抽象了一个枚举:FlowControlStrategyType

   支持3种流控策略:TPS、Sum(指定时间段内请求的次数),Delay延迟

 流控策略类型枚举
    enum FlowControlStrategyType
    {
         TPS控制策略
                TPS, 总数控制策略
                Sum,1)"> 延迟控制策略
                Delay
    }
}

  面向每种流控策略类型,提供了一个对应的流控器,比如说TPS的流控器

TPSFlowController,内部使用了固定令牌桶算法
 CZ.FlowControl.Spi;

     TPS流量控制器
     TPSFlowController : IFlowController
    {
        public IThrottleStrategy InnerThrottleStrategy
        {
            ;
        }

        public FlowControlStrategy FlowControlStrategy { return InnerThrottleStrategy.ShouldThrottle(n,1)"> TPSFlowController(FlowControlStrategy strategy)
        {
            FlowControlStrategy = strategy;

            InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold,1)">1000);
        }
    }
}

  Sum(指定时间段内请求的次数)流控器:

  

 System.IO;
 System.Linq;
 一段时间内合计值流量控制器
     SumFlowController : IFlowController
    {
         SumFlowController(FlowControlStrategy strategy)
        {
            FlowControlStrategy = strategy;

            var refillInterval = GetTokenBucketRefillInterval(strategy);

            InnerThrottleStrategy = );
        }

         GetTokenBucketRefillInterval(FlowControlStrategy strategy)
        {
            long refillInterval = ;

            switch (strategy.TimeSpan)
            {
                case FlowControlTimespan.Second:
                    refillInterval = ;
                    break;
                 FlowControlTimespan.Minute:
                    refillInterval = 60 FlowControlTimespan.Hour:
                    refillInterval = 60 *  FlowControlTimespan.Day:
                    refillInterval = 24 *  refillInterval;
        }
    }
}

  同时,通过一个创建者工厂,根据不同的流控策略,创建对应的流控器(做了一层缓存,性能更好):

 流控策略工厂
     FlowControllerFactory
    {
        static Dictionary< fcControllers;
        object syncObj = ();

        static FlowControllerFactory instance;

        private FlowControllerFactory()
        {
            fcControllers = ();
        }

         FlowControllerFactory GetInstance()
        {
            if (instance == )
            {
                 (syncObj)
                {
                    )
                    {
                        instance = new FlowControllerFactory();
                    }
                }
            }

             instance;
        }

         IFlowController GetOrCreateFlowController(FlowControlStrategy strategy)
        {
            if (strategy == new ArgumentNullException(FlowControllerFactory.GetOrCreateFlowController.strategyif (!fcControllers.ContainsKey(strategy.ID))
            {
                fcControllers.ContainsKey(strategy.ID))
                    {
                        var fcController = CreateFlowController(strategy);
                        if (fcController != )
                            fcControllers.Add(strategy.ID,fcController);
                    }
                }
            }

            if (fcControllers.ContainsKey(strategy.ID))
            {
                var controller = fcControllers[strategy.ID];
                 controller;
            }

             IFlowController CreateFlowController(FlowControlStrategy strategy)
        {
            FlowControllerFactory.CreateFlowController.strategy);

            IFlowController controller =  (strategy.StrategyType)
            {
                 FlowControlStrategyType.TPS:
                    controller =  TPSFlowController(strategy);
                     FlowControlStrategyType.Delay:
                    controller =  DelayFlowController(strategy);
                     FlowControlStrategyType.Sum:
                    controller =  SumFlowController(strategy);
                    default:
                     controller;
        }
    }
}

 

   有了流控策略定义、我们更进一步,继续封装了流控Facade服务,这样把流控的变化封装到内部。对外只提供流控服务接口,流控时动态传入流控策略和流控个数:FlowControlService

   

 CZ.FlowControl.Spi;
     System.Threading;

     统一流控服务
     FlowControlService
    {
         流控
        <param name="strategy">流控策略</param>
        <param name="count">请求次数</param>
        void FlowControl(FlowControlStrategy strategy,1)">int count = )
        {
             FlowControllerFactory.GetInstance().GetOrCreateFlowController(strategy);

            TimeSpan waitTimespan = TimeSpan.Zero;

            var result = controller.ShouldThrottle(count,1)"> waitTimespan);
             (result)
            {
                if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero)
                {
                    WaitForAvailable(strategy,controller,waitTimespan,count);
                }
                else  (strategy.IsRefusedRequest)
                {
                    new Exception(触发流控!);
                }
            }
        }

         等待可用
        <param name="controller">流控器<param name="waitTimespan">等待时间void WaitForAvailable(FlowControlStrategy strategy,IFlowController controller,TimeSpan waitTimespan,1)"> count)
        {
            var timespan = waitTimespan;
            if (strategy.StrategyType == FlowControlStrategyType.Delay)
            {
                Thread.Sleep(timespan);
                while (controller.ShouldThrottle(count,1)"> timespan))
            {
                Thread.Sleep(timespan);
            }
        }
    }
}

  以上,统一流控服务完成了第一个版本的封装。接下来我们看示例代码

 四、示例代码

    先安装Nuget:


Install-Package CZ.FlowControl.Service -Version 1.0.0

 

    

   

    是不是很简单。

    大家如果希望了解详细的代码,请参考这个项目的GitHub地址:

    https://github.com/zhouguoqing/FlowControl

    同时也欢迎大家一起改进完善。

    

 

周国庆

2019/8/9

    

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。事实上,不仅仅是在事件处理器
上文已经介绍了Identity Service的实现过程。今天我们继续,实现一个简单的Weather API和一个基于Ocelot的API网关。 回顾 《Angular SPA基于Ocelot API网关与IdentityServer4的身份认证与授权(一)》 Weather API Weather
最近我为我自己的应用开发框架Apworks设计了一套案例应用程序,并以Apache 2.0开源,开源地址是:https://github.com/daxnet/apworks-examples,目的是为了让大家更为方便地学习和使用.NET Core、最新的前端开发框架Angular,以及Apwork
HAL(Hypertext Application Language,超文本应用语言)是一种RESTful API的数据格式风格,为RESTful API的设计提供了接口规范,同时也降低了客户端与服务端接口的耦合度。很多当今流行的RESTful API开发框架,包括Spring REST,也都默认支
在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,
HAL,全称为Hypertext Application Language,它是一种简单的数据格式,它能以一种简单、统一的形式,在API中引入超链接特性,使得API的可发现性(discoverable)更强,并具有自描述的特点。使用了HAL的API会更容易地被第三方开源库所调用,并且使用起来也很方便
何时使用领域驱动设计?其实当你的应用程序架构设计是面向业务的时候,你已经开始使用领域驱动设计了。领域驱动设计既不是架构风格(Architecture Style),也不是架构模式(Architecture Pattern),它也不是一种软件开发方法论,所以,是否应该使用领域驱动设计,以及什么时候使用
《在ASP.NET Core中使用Apworks快速开发数据服务》一文中,我介绍了如何使用Apworks框架的数据服务来快速构建用于查询和管理数据模型的RESTful API,通过该文的介绍,你会看到,使用Apworks框架开发数据服务是何等简单快捷,提供的功能也非常多,比如对Hypermedia的
在上一讲中,我们已经完成了一个完整的案例,在这个案例中,我们可以通过Angular单页面应用(SPA)进行登录,然后通过后端的Ocelot API网关整合IdentityServer4完成身份认证。在本讲中,我们会讨论在当前这种架构的应用程序中,如何完成用户授权。 回顾 《Angular SPA基于
Keycloak是一个功能强大的开源身份和访问管理系统,提供了一整套解决方案,包括用户认证、单点登录(SSO)、身份联合、用户注册、用户管理、角色映射、多因素认证和访问控制等。它广泛应用于企业和云服务,可以简化和统一不同应用程序和服务的安全管理,支持自托管或云部署,适用于需要安全、灵活且易于扩展的用
3月7日,微软发布了Visual Studio 2017 RTM,与之一起发布的还有.NET Core Runtime 1.1.0以及.NET Core SDK 1.0.0,尽管这些并不是最新版,但也已经从preview版本升级到了正式版。所以,在安装Visual Studio 2017时如果启用了
在上文中,我介绍了如何在Ocelot中使用自定义的中间件来修改下游服务的response body。今天,我们再扩展一下设计,让我们自己设计的中间件变得更为通用,使其能够应用在不同的Route上。比如,我们可以设计一个通用的替换response body的中间件,然后将其应用在多个Route上。 O
不少关注我博客的朋友都知道我在2009年左右开发过一个名为Apworks的企业级应用程序开发框架,旨在为分布式企业系统软件开发提供面向领域驱动(DDD)的框架级别的解决方案,并对多种系统架构风格提供支持。这个框架的开发和维护我坚持了很久,一直到2015年,我都一直在不停地重构这个项目。目前这个项目在
好吧,这个题目我也想了很久,不知道如何用最简单的几个字来概括这篇文章,原本打算取名《Angular单页面应用基于Ocelot API网关与IdentityServer4ʺSP.NET Identity实现身份认证与授权》,然而如你所见,这样的名字实在是太长了。所以,我不得不缩写“单页面应用”几个字
在前面两篇文章中,我介绍了基于IdentityServer4的一个Identity Service的实现,并且实现了一个Weather API和基于Ocelot的API网关,然后实现了通过Ocelot API网关整合Identity Service做身份认证的API请求。今天,我们进入前端开发,设计
Ocelot是ASP.NET Core下的API网关的一种实现,在微服务架构领域发挥了非常重要的作用。本文不会从整个微服务架构的角度来介绍Ocelot,而是介绍一下最近在学习过程中遇到的一个问题,以及如何使用中间件(Middleware)来解决这样的问题。 问题描述 在上文中,我介绍了一种在Angu
在大数据处理和人工智能时代,数据工厂(Data Factory)无疑是一个非常重要的大数据处理平台。市面上也有成熟的相关产品,比如Azure Data Factory,不仅功能强大,而且依托微软的云计算平台Azure,为大数据处理提供了强大的计算能力,让大数据处理变得更为稳定高效。由于工作中我的项目
在上文中,我们讨论了事件处理器中对象生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越复杂了。其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接
在之前《在ASP.NET Core中使用Apworks快速开发数据服务》一文的评论部分,.NET大神张善友为我提了个建议,可以使用Compile As a Service的Roslyn为语法解析提供支持。在此非常感激友哥给我的建议,也让我了解了一些Roslyn的知识。使用Roslyn的一个很大的好处
很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便