研究Fabric中Etcd的Raft应用

简单回顾一下Etcd中的Wal

先看一下etcd raft library中的rafte示例结构():

image

从图中可以看出,Etcd的raft示例的大致流程:

  • 首先需要启动节点RaftNode
  • 应用层,通过proprseC 和 confChangeC 通知RaftNode新的提案或者配置变更。
  • 一个RaftNode都会启动一个独立的后台goroutine来完成回放WAL日志、启动网络组件等初始化操作。

长安链中的Raft应用

  1. 通过onMessage接收来自核心引擎的提案
  2. 处理提案,并推送RaftNode: raftNode.Propose()
  3. 等待Raft状态机ready
  4. 收到ready时存储wal和触发snapshot,向从节点sendMessage并raftNode.Advance()通知应用层已经保存进度到最后一个Ready
  5. punishEntry并提交区块到存储
  6. 从节点收到message后,推进状态机。raftNode.Step()

长安链Raft流程

Fabric中的Raft应用

Fabric中的共识介绍

Fabric的共识服务设计成了可插拔的模块,以此满足了根据不同应用场景切换不同共识选项的需求。在Hyperledger Fabric最新版本中,Fabric系统的共识模块中实现了三种共识算法,其中包括Solo,Kafka以及Raft算法。官方推荐的是使用Raft共识算法,但是为了更好地理解Fabric中的共识模块,我们也简单介绍一下Solo和Kafka这两种共识算法。

  • solo共识:假设网络环境中只有一个排序节点,从Peer节点发送来的消息由一个排序节点进行排序和产生区块。由于排序服务只有一个排序节点为所有的peer节点服务,虽然可以肯定保证顺序一致性,但是没有高可用性和可扩展性,所以不适合用于生产环境,只能用于开发和测试环境。
  • Kafka共识:Kafka是一个分布式的流式信息处理平台,目标是为实时数据提供统一的、高吞吐、低延迟的性能。Hyperledger Fabric之前版本的核心共识算法通过Kafka集群实现,简单来说,就是通过Kafka对所有交易信息进行排序(如果系统存在多个通道,则对每个通道分别排序)。
  • Raft共识:Raft是Hyperledger Fabric在1.4.1版本中引入的,它是一种基于 etcd 的崩溃容错(CFT)排序服务。Raft 遵循 "领导者和追随者" 模型,其中领导者在通道中的排序节点之间动态选出(这个节点集合称为"consenter set"),该领导者将消息复制到跟随者节点。Raft保证即使在小部分(≤ (N-1)/2)节点故障的情况下,系统仍然能正常对外提供服务,所以Raft被称为"崩溃容错"。

其实,Hyperledger Fabric在1.4.1版本以前,它的核心共识算法通过Kafka集群实现,但是在1.4.1版本之后,Fabric推荐使用Raft算法实现节点的共识。其实从提供服务的视角来看,基于Raft和Kafka的排序服务是类似的,他们都是基于CFT(crash fault tolerant)模型的排序服务,并且都使用了主从节点的设置。但是为什么Hyperledger Fabric选择Raft算法呢?我们列举了Raft相较于Kafka所展现出的优势来回答这个问题。

  • 第一点,Raft 更容易设置。虽然 Kafka 有很多崇拜者,但即使是那些崇拜者也(通常)会承认部署 Kafka 集群及其所必须的 ZooKeeper 集群会很棘手,需要在 Kafka 基础设施和设置方面拥有高水平的专业知识。此外,使用 Kafka 管理的组件比使用 Raft 管理的组件多,Kafka 有自己的版本,必须与排序节点协调。而使用 Raft,所有内容都会嵌入到排序节点中。
  • 第二点,Kafka和zookeeper的设计不适用于大型网络。它们的设计是CFT模型,但局限于运行在比较紧密的主机上。也就是说,需要有一个组织专门运行Kafka集群。鉴于此,当有多个组织使用基于Kafka排序服务的时候,其实没有实现去中心化,因为所有的节点连接的都是由一个组织单独控制的Kafka集群。如果使用Raft算法,每个组织可以贡献排序节点,共同组成排序服务,可以更好的去中心化。
  • 第三点,Raft是原生支持的,而Kafka需要经过复杂的步骤部署,并且需要单独学习成本。而且Kafka和Zookeeper的支持相关的issue要通过apache来处理,而不是Hyperledger Fabric。Raft的实现是包含在Fabric社区的,开发支持更加便利。
  • 第四点,Raft 是向开发拜占庭容错(BFT)排序服务迈出的第一步。正如我们将看到的,Fabric 开发中的一些决策是由这个驱动的。Fabric使用Raft共识算法是向BFT类算法过渡的步骤。

结构的定义

Hyperledger Fabric对Raft算法的核心实现代码都是放在fabric/orderer/consensus/etcdraft包下的,这里主要包含几个核心的数据结构.

  • Chain接口
  • Chain结构体
  • node结构体

首先,Chain接口的定义在fabric/orderer/consensus/etcdraft/consensus.go文件下,它主要定义了排序节点对接收到的客户端发送来的消息的处理操作,它的详细定义如下:

// Chain defines a way to inject messages for ordering.
type Chain interface {
    // 负责对普通交易消息进行处理排序。当排序服务在 broadCast 接口收到消息进行校验和过滤之后,就交由对应 Chain 实例进行处理。
	Order(env *cb.Envelope, configSeq uint64) error
	Configure(config *cb.Envelope, configSeq uint64) error
	WaitReady() error
	Errored() <-chan struct{}
   // Start()负责启动此 Chain 服务。
	Start()
	Halt()
}

其次,Chain结构体实现了Chain接口,它里面主要定义了一些通道(channel)用于节点间的通信,以便根据通信消息做相应的操作。

// Chain implements consensus.Chain interface.
type Chain struct {
   configurator Configurator
   rpc RPC // 节点与外部节点进行通信的对象,RPC 是一个接口,包含两个方法SendConsensus 和 SendSubmit。前面这种用于节点间 raft 信息的通讯,后者用于转发交易请求给 leader 节点。
   raftID    uint64
   channelID string
   lastKNownleader uint64
   ActiveNodes     atomic.Value
   submitC  chan *submit // 接收 Orderer 客户端提交的共识请求消息的通道
   applyC   chan apply // 接收 raft 节点间应用消息的通道
   observeC chan<- raft.softState
   haltC    chan struct{}
   doneC    chan struct{}
   startC   chan struct{}
   snapC    chan *raftpb.Snapshot //接收 raft 节点快照数据的通道
   gcC      chan *gc
   …
   Node *node // 封装了底层 raft 库的节点实例
   …
}

最后,node结构体主要用于将Fabric自己实现的Raft上层应用和etcd的底层Raft实现连接起来,可以说node结构体是它们之间通信的桥梁,正是它的存在屏蔽了Raft实现的细节。

type node struct {
   chainID string
   logger  *flogging.FabricLogger
   metrics *Metrics
   unreachableLock sync.RWMutex
   unreachable     map[uint64]struct{}
   tracker *Tracker
   storage *RaftStorage
   config  *raft.Config
   rpc RPC
   chain *Chain // 前面定义的Fabric自己实现的Chain结构体
   tickInterval time.Duration
   clock        clock.Clock
   Metadata *etcdraft.BlockMetadata
   subscriberC chan chan uint64
   raft.Node // etcd底层的Raft中的节点接口
}

Raft启动

Raft的启动入口位于fabric/orderer/consensus/etcdraft/chain.go文件中,在Chain的Start()方法中会启动etcdraft/node.go中的node.start(),而node.start()方法中进而启动etcd已经封装好的raft.StartNode()方法

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
    
    ...
    
    //启动Node节点
    c.Node.start(c.fresh, isJoin)

	close(c.startC)
	close(c.errorC)

	go c.gc()  //从MemoryStorage中获取快照,并将其持久化到wal和磁盘。
	go c.run()

	es := c.newevictionSUSPECTor()

	interval := DefaultleaderlessCheckInterval
	if c.opts.leaderCheckInterval != 0 {
		interval = c.opts.leaderCheckInterval
	}

	c.periodicChecker = &PeriodicCheck{
		Logger:        c.logger,
		Report:        es.confirmSuspicion,
		ReportCleared: es.clearSuspicion,
		CheckInterval: interval,
		Condition:     c.SUSPECTeviction,
	}
	c.periodicChecker.Run()

}

Chain中的Start方法主要完成了启动etcdraft.Node端的循环来初始化Raft集群节点。而且Chain里面通过调用c.run()实现了通过循环处理客户端和Raft底层发送的消息。

我们再来看etcdraft.Node端的Start方法,它作为Chain端和raft/node端的桥梁,会根据Chain中传递的元数据配置信息获取启动Raft节点的ID信息,并且调用底层的Raft.StartNode方法启动节点,并且像Chain端中一样会启动n.run()来循环处理消息。

func (n *node) start(fresh, join bool) {
   …
   var campaign bool
   if fresh {// 是否是新节点标记位
      if join {// 是否是新加入节点标记位
         raftPeers = nil
         n.logger.Info("Starting raft node to join an existing channel")
      } else {
         n.logger.Info("Starting raft node as part of a new channel")
         sha := sha256.Sum256([]byte(n.chainID))
         number, _ := proto.DecodeVarint(sha[24:])
         if n.config.ID == number%uint64(len(raftPeers))+1 {
            campaign = true
         }
      }
      // 调用raft/node中的启动节点函数,初始化raft
      n.Node = raft.StartNode(n.config, raftPeers)
   } else {
      n.logger.Info("Restarting raft node")
      n.Node = raft.RestartNode(n.config)
   }
   n.subscriberC = make(chan chan uint64)
// run方法中会启动一个循环用来接收raft节点发来的消息,在这里经过进一步处理后,转发给Chain层进行处理,消息的转发机制都是通过通道来完成的。
   go n.run(campaign)
}

最后,在etcdraft/node中启动的raft.StartNode()表示进一步启动了Raft底层的Node节点,在这里会进行Raft的初始化,读取配置启动各个节点以及初始化logindex等。与前面的启动流程一样,它同样会开启一个run方法以循环的方法不断监听各通道的信息来实现状态的切换和做出相应的动作。

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
   if len(peers) == 0 {
      panic("no peers given; use RestartNode instead")
   }
   rn, err := NewRawNode(c)
   if err != nil {
      panic(err)
   }
   rn.Bootstrap(peers)

   n := newNode(rn)

   go n.run()
   return &n
}

chain.run

里面定义了,成为主节点后,监听chan *common.Block通道的proposal信息,并将proposal信息提交给etcd的Raft.Propose()

func (c *Chain) run() {
    ...
    
    becomeleader := func() (chan<- *common.Block, context.CancelFunc) {
        ch := make(chan *common.Block, c.opts.MaxInflightBlocks)
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:   //监听通道,接受Proposal的区块信息
                	data := protoutil.MarshalOrPanic(b)
                	if err := c.Node.Propose(ctx, data); err != nil {
                		c.logger.Errorf("Failed to propose block [%d] to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                		return
                	}
                	c.logger.Debugf("Proposed block [%d] to raft consensus", b.Header.Number)
                
                case <-ctx.Done():
                	c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                	return
                }
            }
        }(ctx, ch)

        }

    ...
}

n.run()

Raft的业务层处理逻辑,和chainmaker类似。主要是对etcd的raft状态机吐出的ready的处理。

func (n *node) run(campaign bool) {
    for {
        select {
        case <-raftTicker.C():
            // grab raft Status before ticking it, so `RecentActive` attributes
            // are not reset yet.
            status := n.Status()

            n.Tick()
            n.tracker.Check(&status)

        case rd := <-n.Ready():
            startStoring := n.clock.Now()
            // 第一步,存wal
            if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
                n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
            }
            duration := n.clock.Since(startStoring).Seconds()
            n.metrics.DataPersistDuration.Observe(float64(duration))
            if duration > halfElectionTimeout {
                n.logger.Warningf("WAL sync took %v seconds and the network is configured to start elections after %v seconds. Your disk is too slow and may cause loss of quorum and trigger leadership election.", duration, electionTimeout)
            }

            if !raft.IsEmptySnap(rd.Snapshot) {
                n.chain.snapC <- &rd.Snapshot
            }

            if notifyleaderChangeC != nil && rd.softState != nil {
                if l := atomic.LoadUint64(&rd.softState.Lead); l != raft.None {
                    select {
                    case notifyleaderChangeC <- l:
                    default:
                    }

                    notifyleaderChangeC = nil
                }
            }

            // skip empty apply
            if len(rd.CommittedEntries) != 0 || rd.softState != nil {
                n.chain.applyC <- apply{rd.CommittedEntries, rd.softState}
            }

            if campaign && rd.softState != nil {
                leader := atomic.LoadUint64(&rd.softState.Lead) // etcdraft requires atomic access to this var
                if leader != raft.None {
                    n.logger.Infof("leader %d is present, quit campaign", leader)
                    campaign = false
                    close(elected)
                }
            }

            n.Advance() 

            // Todo(jay_guo) leader can write to disk in parallel with replicating
            // to the followers and them writing to their disks. Check 10.2.1 in thesis
            n.send(rd.Messages)

        case notifyleaderChangeC = <-n.subscriberC:

        case <-n.chain.haltC:
            raftTicker.Stop()
            n.Stop()
            n.storage.Close()
            n.logger.Infof("Raft node stopped")
            close(n.chain.doneC) // close after all the artifacts are closed
            return
        }
    }

}

Fabric Raft机制的交易处理流程

1. 提交提案

首先,客户端将会把已经背书的交易提案以broadcast请求的形式转发给Raft集群的leader进行处理。我们在第二节中也提到了,Fabric中的交易可以分为两类,一类是普通交易,另一类是部署交易(也叫做配置交易)。这两类请求将分别调用不同的函数,即Order和Configure函数来完成交易提案的提交。

// Order submits normal type transactions for ordering.
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
   c.Metrics.normalProposalsReceived.Add(1)
   return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}
// Configure submits config type transactions for ordering.
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
   c.Metrics.ConfigProposalsReceived.Add(1)
   return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

2. 转发交易提案到leader

我们从上面的源代码中可以注意到,不论是何种交易类型,里面都会调用Submit方法来提交交易提案。在Submit方法中,主要做的事就是将请求消息封装为结构体并且写入指定的一个通道中(submitC)以便传递给Chain进行处理。此外,它还会判断当前节点是否是leader,如果不是,还会将消息重定向leader节点。

func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
…
   leadC := make(chan uint64, 1)
   select {
   case c.submitC <- &submit{req, leadC}: // 将消息封装并且写入submitC通道
      lead := <-leadC
      if lead == raft.None {
         c.Metrics.ProposalFailures.Add(1)
         return errors.Errorf("no Raft leader")
      }
      if lead != c.raftID { // 当前节点不是leader,则转发消息leader
         if err := c.forwardToleader(lead, req); err != nil {
            return err
         }
      }
   …
   return nil
}

3. 对交易排序

Chain端从submitC通道中将不断接收交易并将它们进行排序处理。

在ordered方法中,将根据不同类型的消息执行不同的排序操作。对于接收到是通道配置消息,比如通道创建、通道配置更新等。先调用ConsensusSupport对配置消息进行检查和应用,然后直接调用 BlockCutter.Cut() 对报文进行切块,这是因为配置信息都是单独成块;而对于普通交易消息,则直接校验之后,调用 BlockCutter.Ordered() 进入缓存排序,并根据出块规则决定是否出块。

func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]* common.Envelope, pending bool, err error) {
  if c.isConfig(msg.Payload) {
      // 配置消息
      …
      batch := c.support.BlockCutter().Cut()
      batches = [][]*common.Envelope{}
      if len(batch) != 0 {
         batches = append(batches, batch)
      }
      batches = append(batches, []*common.Envelope{msg.Payload})
      return batches, false, nil
   }
   // 普通交易信息
   if msg.LastValidationSeq < seq {
     …
   }
   batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
   return batches, pending, nil
}

4. 打包区块

交易消息经c.ordered处理之后,会得到由BlockCutter返回的数据包bathches(可打包成块的数据)和缓存是否还有数据的信息。如果缓存还有余留数据未出块,则启动计时器,否则重置计时器,这里的计时器由case timer.C处理。

接下来,将会调用propose方法来打包交易为区块。propose会根据batches数据包调用createNextBlock打包出block ,并将block传递给c.ch通道(只有leader具有propose的权限)。而如果当前交易是配置信息,还需要标记处当前正在进行配置更新的状态。

func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
   for _, batch := range batches {
      b := bc.createNextBlock(batch) // 根据当前批次创建一个区块
      c.logger.Infof("Created block [%d], there are %d blocks in flight", b.Header.Number, c.blockInflight)
      select {
      case ch <- b: // 将block传递给c.ch通道,leader可以通过这个通道收到这个区块
      default:
         c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
      }
      // if it is config block, then we should wait for the commit of the block
      if protoutil.IsConfigBlock(b) {
         c.configInflight = true
      }
      c.blockInflight++
   }
}

5. Raft对区块的共识

leader将会前面说的区块通过调用c.Node.Propose将数据传递给底层Raft状态机。这里的Propose就是提议将数据写入到各节点的日志中,这里也是实现节点间共识的入口方法

Propose就是将日志广播出去,要所有节点都尽量保存起来,但还没有提交,等到leader收到半数以上的节点都响应说已经保存完了,leader这时就可以提交了,下一次Ready的时候就会带上committedindex。

func (n *node) Propose(ctx context.Context, data []byte) error {
   return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

6. 保存区块

经过Raft共识后,节点需要将区块写入到本地,这里Raft底层会通过通道的方式传递保存区块到本地的消息(即CommittedEntries不为空的消息)。在这里,Fabric通过实现apply方法完成了保存区块的功能

func (c *Chain) apply(ents []raftpb.Entry) {
   …
   for i := range ents {
      switch ents[i].Type {
      case raftpb.Entrynormal:// 如果是普通entry消息
         …
         block := protoutil.UnmarshalBlockOrPanic(ents[i].Data)
         c.writeBlock(block, ents[i].Index) // 写入区块到本地
 c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
      case raftpb.EntryConfChange:// 如果是配置entry消息
         var cc raftpb.ConfChange
         if err := cc.Unmarshal(ents[i].Data); err != nil {
            c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
            continue
         }
         c.confState = *c.Node.ApplyConfChange(cc)
         switch cc.Type {
         case raftpb.ConfChangeAddNode:
            c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
         case raftpb.ConfChangeRemoveNode:
            c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
         default:
            c.logger.Panic("Programming error, encountered unsupported raft config change")
         }
…
      if ents[i].Index > c.appliedindex {
         c.appliedindex = ents[i].Index
      }
   }
}

原文地址:https://cloud.tencent.com/developer/article/2065530

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340