filecoin源码分析之协议层心跳协议

5.1 源码信息

  • version
    • master分支 619b0eb1(2019年3月2日)
  • package
    • metrics
  • location
    • metrics/heartbeat.go
    • node/node.go

5.2 源码分析

5.2.1 数据结构

  • 定义心跳协议名称以及连接超时时间
    // HeartbeatProtocol is the libp2p protocol used for the heartbeat service
    const (
    	HeartbeatProtocol = "fil/heartbeat/1.0.0"
    	// Minutes to wait before logging connection failure at ERROR level
    	connectionFailureErrorLogPeriodMinutes = 10 * time.Minute
    )
    
  • 定义心跳信息结构
    • 节点的区块头
    • 节点的区块高度
    • 节点的昵称
    • 是否在区块同步中(TODO)
    • 矿工地址(如果没有挖矿,这里为零地址)
      // Heartbeat contains the information required to determine the current state of a node.
      // Heartbeats are used for aggregating information about nodes in a log aggregator
      // to support alerting and devnet visualization.
      type Heartbeat struct {
      	// Head represents the heaviest tipset the nodes is mining on
      	Head string
      	// Height represents the current height of the Tipset
      	Height uint64
      	// Nickname is the nickname given to the filecoin node by the user
      	Nickname string
      	// TODO: add when implemented
      	// Syncing is `true` iff the node is currently syncing its chain with the network.
      	// Syncing bool
      	// Address of this node's active miner. Can be empty - will return the zero address
      	MinerAddress address.Address
      }
      
  • 心跳服务结构体
    • 主机结构体:对应libp2p主机
    • 心跳配置
    • 区块头获取
    • 挖矿地址获取
    • stream锁
    • stream
      // HeartbeatService is responsible for sending heartbeats.
      type HeartbeatService struct {
      	Host   host.Host
      	Config *config.HeartbeatConfig
      	// A function that returns the heaviest tipset
      	HeadGetter func() types.TipSet
      	// A function that returns the miner's address
      	MinerAddressGetter func() address.Address
      	streamMu sync.Mutex
      	stream   net.Stream
      }
      
  • 定义心跳服务Option函数
    • 函数入参为心跳服务结构体,主要用于对心跳服务结构体传参或者解析
      // HeartbeatServiceOption is the type of the heartbeat service's functional options.
      type HeartbeatServiceOption func(service *HeartbeatService)
      

5.2.2 方法

  • 获取心跳服务的stream实例
    // Stream returns the HeartbeatService stream. Safe for concurrent access.
    // Stream is a libp2p connection that heartbeat messages are sent over to an aggregator.
    func (hbs *HeartbeatService) Stream() net.Stream {
    	hbs.streamMu.Lock()
    	defer hbs.streamMu.Unlock()
    	return hbs.stream
    }
    
  • 设置心跳服务的stream实例
    // SetStream sets the stream on the HeartbeatService. Safe for concurrent access.
    func (hbs *HeartbeatService) SetStream(s net.Stream) {
    	hbs.streamMu.Lock()
    	defer hbs.streamMu.Unlock()
    	hbs.stream = s
    }
    
  • 定时确认连接性,并调用运行心跳服务
    // Start starts the heartbeat service by, starting the connection loop. The connection
    // loop will attempt to connected to the aggregator service, once a successful
    // connection is made with the aggregator service hearbeats will be sent to it.
    // If the connection is broken the heartbeat service will attempt to reconnect via
    // the connection loop. Start will not return until context `ctx` is 'Done'.
    func (hbs *HeartbeatService) Start(ctx context.Context) {
    	log.Debug("starting heartbeat service")
    	rd, err := time.ParseDuration(hbs.Config.ReconnectPeriod)
    	if err != nil {
    		log.Errorf("invalid heartbeat reconnectPeriod: %s", err)
    		return
    	}
        //启动重连定时器
    	reconTicker := time.NewTicker(rd)
    	defer reconTicker.Stop()
    	// Timestamp of the first connection failure since the last successful connection.
    	// Zero initially and while connected.
    	var failedAt time.Time
    	// Timestamp of the last ERROR log (or of failure, before the first ERROR log).
    	var erroredAt time.Time
    	for {
    		select {
    		case <-ctx.Done():
    			return
    		case <-reconTicker.C:
                //重连定时周期到,重新连接
    			if err := hbs.Connect(ctx); err != nil {
    				// Logs once as a warning immediately on failure, then as error every 10 minutes.
    				now := time.Now()
    				logfn := log.Debugf
    				if failedAt.IsZero() { // First failure since connection
    					failedAt = now
    					erroredAt = failedAt // Start the timer on raising to ERROR level
    					logfn = log.Warningf
    				} else if now.Sub(erroredAt) > connectionFailureErrorLogPeriodMinutes {
    					logfn = log.Errorf
    					erroredAt = now // Reset the timer
    				}
    				failureDuration := now.Sub(failedAt)
    				logfn("Heartbeat service failed to connect for %s: %s", failureDuration, err)
    				// failed to connect, continue reconnect loop
    				continue
    			}
    			failedAt = time.Time{}
    			// we connected, send heartbeats!
    			// Run will block until it fails to send a heartbeat.
                //如果连接成功,运行心跳服务
    			if err := hbs.Run(ctx); err != nil {
    				log.Warning("disconnecting from aggregator, failed to send heartbeat")
    				continue
    			}
    		}
    	}
    }
    
  • 运行心跳服务
    // Run is called once the heartbeat service connects to the aggregator. Run
    // send the actual heartbeat. Run will block until `ctx` is 'Done`. An error will
    // be returned if Run encounters an error when sending the heartbeat and the connection
    // to the aggregator will be closed.
    func (hbs *HeartbeatService) Run(ctx context.Context) error {
    	bd, err := time.ParseDuration(hbs.Config.BeatPeriod)
    	if err != nil {
    		log.Errorf("invalid heartbeat beatPeriod: %s", err)
    		return err
    	}
        //启动心跳定时器
    	beatTicker := time.NewTicker(bd)
    	defer beatTicker.Stop()
        //通过encoder进行流写入
    	// TODO use cbor instead of json
    	encoder := json.NewEncoder(hbs.stream)
    	for {
    		select {
    		case <-ctx.Done():
    			return nil
    		case <-beatTicker.C:
            //心跳定时周期到,调用Beat方法获取心跳参数
    			hb := hbs.Beat()
                //写入流,发起心跳
    			if err := encoder.Encode(hb); err != nil {
                    //发生错误会关闭流连接
    				hbs.stream.Conn().Close() // nolint: errcheck
    				return err
    			}
    		}
    	}
    }
    
  • 获取心跳参数
    // Beat will create a heartbeat.
    func (hbs *HeartbeatService) Beat() Heartbeat {
    	nick := hbs.Config.Nickname
    	ts := hbs.HeadGetter()
    	tipset := ts.ToSortedCidSet().String()
    	height, err := ts.Height()
    	if err != nil {
    		log.Warningf("heartbeat service failed to get chain height: %s", err)
    	}
    	addr := hbs.MinerAddressGetter()
    	return Heartbeat{
    		Head:         tipset,
    		Height:       height,
    		Nickname:     nick,
    		MinerAddress: addr,
    	}
    }
    
  • 心跳流连接
    // Connect will connects to `hbs.Config.BeatTarget` or returns an error
    func (hbs *HeartbeatService) Connect(ctx context.Context) error {
    	log.Debugf("Heartbeat service attempting to connect, targetAddress: %s", hbs.Config.BeatTarget)
    	targetMaddr, err := ma.NewMultiaddr(hbs.Config.BeatTarget)
    	if err != nil {
    		return err
    	}
    	pid, err := targetMaddr.ValueForProtocol(ma.P_P2P)
    	if err != nil {
    		return err
    	}
    	peerid, err := peer.IDB58Decode(pid)
    	if err != nil {
    		return err
    	}
    	// Decapsulate the /p2p/<peerID> part from the target
    	// /ip4/<a.b.c.d>/p2p/<peer> becomes /ip4/<a.b.c.d>
    	targetPeerAddr, _ := ma.NewMultiaddr(
    		fmt.Sprintf("/p2p/%s", peer.IDB58Encode(peerid)))
    	targetAddr := targetMaddr.Decapsulate(targetPeerAddr)
    	hbs.Host.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
        // 建立心跳服务流
    	s, err := hbs.Host.NewStream(ctx, peerid, HeartbeatProtocol)
    	if err != nil {
    		log.Debugf("failed to open stream, peerID: %s, targetAddr: %s %s", peerid, targetAddr, err)
    		return err
    	}
    	log.Infof("successfully to open stream, peerID: %s, targetAddr: %s", peerid, targetAddr)
        //设置流函数
    	hbs.SetStream(s)
    	return nil
    }
    

5.2.3 函数

  • 向心跳服务结构体传参,用于设置获取矿工地址函数
    // WithMinerAddressGetter returns an option that can be used to set the miner address getter.
    func WithMinerAddressGetter(ag func() address.Address) HeartbeatServiceOption {
    	return func(service *HeartbeatService) {
    		service.MinerAddressGetter = ag
    	}
    }
    
  • 获取默认的矿工地址
    func defaultMinerAddressGetter() address.Address {
    	return address.Address{}
    }
    
  • 实例化心跳服务,具体的实例化在node包中实现。
    // NewHeartbeatService returns a HeartbeatService
    func NewHeartbeatService(h host.Host, hbc *config.HeartbeatConfig, hg func() types.TipSet, options ...HeartbeatServiceOption) *HeartbeatService {
    	srv := &HeartbeatService{
    		Host:               h,
    		Config:             hbc,
    		HeadGetter:         hg,
    		MinerAddressGetter: defaultMinerAddressGetter,
    	}
        // 设置心跳服务的获取矿工属性,这会覆盖到上面设置的默认矿工地址
    	for _, option := range options {
    		option(srv)
    	}
    	return srv
    }
    

5.2.4 实例化及业务逻辑

  • 主要由node调用,location:node/node.go,主要逻辑如下
  • 在node的启动方法中,调用node.setupHeartbeatServices方法,建立心跳服务
    // Start boots up the node.
    func (node *Node) Start(ctx context.Context) error {
        ......
    	if err := node.setupHeartbeatServices(ctx); err != nil {
    		return errors.Wrap(err, "failed to start heartbeat services")
    	}
    	return nil
    }
    
  • 建立心跳服务,具体见如下注释
    func (node *Node) setupHeartbeatServices(ctx context.Context) error {
        // 设置“矿工地址获取函数”
    	mag := func() address.Address {
    		addr, err := node.miningAddress()
    		// the only error miningAddress() returns is ErrNoMinerAddress.
    		// if there is no configured miner address, simply send a zero
    		// address across the wire.
    		if err != nil {
    			return address.Address{}
    		}
    		return addr
    	}
        // 存在心跳目标的时候,实例化心跳服务实例
    	// start the primary heartbeat service
    	if len(node.Repo.Config().Heartbeat.BeatTarget) > 0 {
            //调用metrics包中的建立心跳服务实例、以及启动心跳服务实例方法
    		hbs := metrics.NewHeartbeatService(node.Host(), node.Repo.Config().Heartbeat, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
    		go hbs.Start(ctx)
    	}
        // 确认是否用户有通过环境变量配置额外的心跳告警服务(自定义指向其他节点),根据用户配置的数目,拉起对应的多线程心跳服务。
    	// check if we want to connect to an alert service. An alerting service is a heartbeat
    	// service that can trigger alerts based on the contents of heatbeats.
    	if alertTarget := os.Getenv("FIL_HEARTBEAT_ALERTS"); len(alertTarget) > 0 {
    		ahbs := metrics.NewHeartbeatService(node.Host(), &config.HeartbeatConfig{
    			BeatTarget:      alertTarget,
    			BeatPeriod:      "10s",
    			ReconnectPeriod: "10s",
    			Nickname:        node.Repo.Config().Heartbeat.Nickname,
    		}, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
    		go ahbs.Start(ctx)
    	}
    	return nil
    }