• ETCD数据库源码分析——集群间网络层服务端RaftHandler


    ETCD数据库源码分析——集群间网络层服务端接口文章中,启动http.Server时会通过rafthttp.Transporter.Handler()方法为指定的URL路径添加相应Handler实例,如下图所示。streamHandler负责处理Stream消息通道上的请求。pipelineHandler负责处理Pipeline通道上的请求,snapHandler负责处理Pipeline通道上的请求。
    在这里插入图片描述

    pipelineHandler

    pipelineHandler用于集群间网络层服务端负责处理Pipeline通道上的请求,tr为当前pipeline实例关联的rafthttp.Transport实例,r为底层的Raft实例,cid为当前集群的ID。

    type pipelineHandler struct {
    	lg      *zap.Logger
    	localID types.ID
    	tr      Transporter
    	r       Raft
    	cid     types.ID
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    pipelineHandler中实现了http.Server.Handler接口的ServeHTTP方法,是其处理请求的核心方法。该方法通过读取对端节点发来的请求得到相应的消息实例,然后将其交给底层的etcd-raft模块进行处理

    func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    	if r.Method != "POST" { // 首先进行一系列的检查,例如:检查请求的Method是否为POST,检测集群ID是否合法等
    		w.Header().Set("Allow", "POST")
    		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    		return
    	}
    	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) // 设置响应头部中的X-Etcd-Cluster-ID为当前集群的ID
    	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
    		http.Error(w, err.Error(), http.StatusPreconditionFailed)
    		return
    	}
    	addRemoteFromRequest(h.tr, r) // 根据http请求头添加一个远程peer
        // 限制每次从底层连续读取的字节数上线,默认是64KB,因为快照数据可能非常大,为了防止读取超时,只能每次读取一部分数据到缓冲区中,最后将全部数据拼接起来,得到完整的快照数据
    	// Limit the data size that could be read from the request body, which ensures that read from
    	// connection will not time out accidentally due to possible blocking in underlying implementation.
    	limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
    	b, err := io.ReadAll(limitedr) // 读取HTTP请求的Body的全部内容
    	if err != nil {
    		h.lg.Warn("failed to read Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),)
    		http.Error(w, "error reading raft message", http.StatusBadRequest)
    		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
    		return
    	}
    
    	var m raftpb.Message
    	if err := m.Unmarshal(b); err != nil { // 反序列化得到raftpb.Message实例
    		h.lg.Warn("failed to unmarshal Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
    		)
    		http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
    		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
    		return
    	}
    
    	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
    
        // 将读取到的消息实例交给底层的Raft状态机进行处理  <---------------------------------
    	if err := h.r.Process(context.TODO(), m); err != nil {
    		switch v := err.(type) {
    		case writerToResponse: v.WriteTo(w)
    		default:
    			h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
    			)
    			http.Error(w, "error processing raft message", http.StatusInternalServerError)
    			w.(http.Flusher).Flush()
    			// disconnect the http stream
    			panic(err)
    		}
    		return
    	}
        
        // 向对端节点返回合适的状态码,表示请求已经被处理
    	// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
    	w.WriteHeader(http.StatusNoContent)
    }
    // addRemoteFromRequest adds a remote peer according to an http request header addRemoteFromRequest 根据一个http请求头添加一个远程peer
    func addRemoteFromRequest(tr Transporter, r *http.Request) {
    	if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
    		if urls := r.Header.Get("X-PeerURLs"); urls != "" { // 如果请求包中的X-PeerURLs字段中有对端URL,则需要服务器对其调用AddRemote函数加入新增的URL
    			tr.AddRemote(from, strings.Split(urls, ","))
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    streamHandler

    streamHandler结构体主要负责在接收到对端的网络连接之后,将其与对应的streamWriter实例进行关联。这样,streamWriter就可以开始向对端节点发送消息了。Stream消息通道维护的是HTTP长连接,主要负责传输数据量较小、发送比较频繁的消息,因此streamHandler.ServeHTTP主要目标就是将outgoingConn实例(streamWriter)与对应的peer实例绑定

    type streamHandler struct {
    	lg         *zap.Logger
    	tr         *Transport // 关联rafthttp.Transport实例
    	peerGetter peerGetter // 其接口中的Get方法会更加指定的节点ID获取对应的peer实例
    	r          Raft       // 底层raft实例
    	id         types.ID   // 当前节点ID
    	cid        types.ID   // 当前集群ID
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    streamHandler中实现了http.Server.Handler接口的ServeHTTP方法,是其处理请求的核心方法。

    func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    	if r.Method != "GET" { // 检测请求Method是否为GET
    		w.Header().Set("Allow", "GET")
    		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    		return
    	}
    	w.Header().Set("X-Server-Version", version.Version)
    	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
    	if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
    		http.Error(w, err.Error(), http.StatusPreconditionFailed)
    		return
    	}
        // 检测集群的ID
    	var t streamType
    	switch path.Dir(r.URL.Path) {
    	case streamTypeMsgAppV2.endpoint(h.lg):
    		t = streamTypeMsgAppV2
    	case streamTypeMessage.endpoint(h.lg):
    		t = streamTypeMessage
    	default:
    		h.lg.Debug("ignored unexpected streaming request path",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", r.URL.Path),)
    		http.Error(w, "invalid path", http.StatusNotFound)
    		return
    	}
        // 获取对端节点的ID
    	fromStr := path.Base(r.URL.Path)
    	from, err := types.IDFromString(fromStr)
    	if err != nil {
    		h.lg.Warn("failed to parse path into ID",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", fromStr),zap.Error(err),
    		)
    		http.Error(w, "invalid from", http.StatusNotFound)
    		return
    	}
    	if h.r.IsIDRemoved(uint64(from)) { // 检测对端是否已经移除
    		h.lg.Warn("rejected stream from remote peer because it was removed",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),
    		)
    		http.Error(w, "removed member", http.StatusGone)
    		return
    	}
    	p := h.peerGetter.Get(from)  // 根据对端节点ID获取对应的Peer实例
    	if p == nil {
    		// This may happen in following cases:
    		// 1. user starts a remote peer that belongs to a different cluster with the same cluster ID.
    		// 2. local etcd falls behind of the cluster, and cannot recognize the members that joined after its current progress.
    		if urls := r.Header.Get("X-PeerURLs"); urls != "" {h.tr.AddRemote(from, strings.Split(urls, ","))}
    		h.lg.Warn("failed to find remote peer in cluster",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
    		)
    		http.Error(w, "error sender not found", http.StatusNotFound)
    		return
    	}
    
    	wto := h.id.String() // 获取当前节点的ID
    	if gto := r.Header.Get("X-Raft-To"); gto != wto { // 检测请求的目标节点是否为当前节点
    		h.lg.Warn("ignored streaming request; ID mismatch",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-header", gto),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
    		)
    		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
    		return
    	}
    
    	w.WriteHeader(http.StatusOK)
    	w.(http.Flusher).Flush() // 调用Flush方法将响应数据发送到对端节点
    
    	c := newCloseNotifier()
    	conn := &outgoingConn{t:       t,Writer:  w,Flusher: w.(http.Flusher),Closer:  c,localID: h.tr.ID,peerID:  from,} // 创建outgoingConn实例
    	p.attachOutgoingConn(conn) // 将outgoingConn实例与对应的streamWriter实例绑定
    	<-c.closeNotify()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    snapHandler

    snapHandler结构体用来接收对端节点发来的快照数据。tr为关联的rafthttp.Transport实例,r为底层的Raft实例,cid为当前集群的ID。snapshotter负责将快照数据保存到本地文件中,参见ETCD源码分析——snap.New函数简介

    type snapshotHandler struct {
    	lg          *zap.Logger
    	tr          Transporter
    	r           Raft
    	snapshotter *snap.Snapshotter
    	localID types.ID
    	cid     types.ID
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ServeHTTP 服务于 HTTP 请求以接收和处理快照消息。 如果请求发送者在没有关闭底层 TCP 连接的情况下死亡,则处理程序将继续等待请求正文,直到 TCP keepalive 在几分钟后发现连接断开。 这是可以接受的,因为1.通过其他 TCP 连接发送的快照消息仍然可以被接收和处理。2.这种情况应该很少发生,所以没有做进一步的优化。snapshotHandler.ServeHTTP方法除了读取对端节点发来的快照数据在本地生成相应的快照文件,并将快照数据通过Raft接口传递给底层的etcd-raft模块进行处理

    // ServeHTTP serves HTTP request to receive and process snapshot message.
    //
    // If request sender dies without closing underlying TCP connection,
    // the handler will keep waiting for the request body until TCP keepalive
    // finds out that the connection is broken after several minutes.
    // This is acceptable because
    // 1. snapshot messages sent through other TCP connections could still be
    // received and processed.
    // 2. this case should happen rarely, so no further optimization is done.
    func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    	start := time.Now()
        // 首先进行一系列的检测
    	if r.Method != "POST" {
    		w.Header().Set("Allow", "POST")
    		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
    		return
    	}
    	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
    	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
    		http.Error(w, err.Error(), http.StatusPreconditionFailed)
    		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
    		return
    	}
    	addRemoteFromRequest(h.tr, r)  // 根据http请求头添加一个远程peer
    
    	dec := &messageDecoder{r: r.Body}
    	// let snapshots be very large since they can exceed 512MB for large installations
    	// 限制每次从底层连接读取的字节数上限,默认是64KB,因为快照数据可能非常大,为了防止读取超时,只能每次读取一部分数据到缓冲区中,最后将全部数据拼接起来,得到完整的快照数据
    	m, err := dec.decodeLimit(snapshotLimitByte)
    	from := types.ID(m.From).String()
    	if err != nil {
    		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
    		h.lg.Warn("failed to decode Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
    		)
    		http.Error(w, msg, http.StatusBadRequest)
    		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
    		snapshotReceiveFailures.WithLabelValues(from).Inc()
    		return
    	}
    	msgSize := m.Size()
    	receivedBytes.WithLabelValues(from).Add(float64(msgSize))
        // 检测读取的消息类型,是否为MsgSnap
    	if m.Type != raftpb.MsgSnap {
    		h.lg.Warn("unexpected Raft message type",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.String("message-type", m.Type.String()),
    		)
    		http.Error(w, "wrong raft message type", http.StatusBadRequest)
    		snapshotReceiveFailures.WithLabelValues(from).Inc()
    		return
    	}
    
    	snapshotReceiveInflights.WithLabelValues(from).Inc()
    	defer func() { snapshotReceiveInflights.WithLabelValues(from).Dec() }()
    
    	h.lg.Info("receiving database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int("incoming-snapshot-message-size-bytes", msgSize),zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
    	)
    
    	// save incoming database snapshot. 使用Snapshotter将快照数据保存到本地文件中
    	n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
    	if err != nil {
    		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
    		h.lg.Warn("failed to save incoming database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Error(err),)
    		http.Error(w, msg, http.StatusInternalServerError)
    		snapshotReceiveFailures.WithLabelValues(from).Inc()
    		return
    	}
    	receivedBytes.WithLabelValues(from).Add(float64(n))
    	downloadTook := time.Since(start)
    	h.lg.Info("received and saved database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int64("incoming-snapshot-size-bytes", n),zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),zap.String("download-took", downloadTook.String()),
    	)
        // 调用Raft.Process方法,将MsgSnap消息传递给底层的etcd-raft模块进行处理
    	if err := h.r.Process(context.TODO(), m); err != nil {
    		switch v := err.(type) {
    		// Process may return writerToResponse error when doing some additional checks before calling raft.Node.Step.
    		case writerToResponse: v.WriteTo(w)
    		default:
    			msg := fmt.Sprintf("failed to process raft message (%v)", err)
    			h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
    			)
    			http.Error(w, msg, http.StatusInternalServerError)
    			snapshotReceiveFailures.WithLabelValues(from).Inc()
    		}
    		return
    	}
    
    	// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
    	w.WriteHeader(http.StatusNoContent)
    	snapshotReceive.WithLabelValues(from).Inc()
    	snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
  • 相关阅读:
    面试必问的ConcurrentHashMap实现原理:数据结构、get与put操作
    云原生Kubernetes:kubectl管理命令
    SpringBoot启动流程简介
    GEE:数据预处理的细节(处理顺序。比如, select() 和 filter() 要优先于 map())
    java直播源码:如何使用Java构建一个高效的直播系统
    DPU网络开发SDK——DPDK(一)
    spring-boot-maven-plugin插件 —— 排除依赖
    Vue3学习(十九) - 使用Vue完成页面参数传递
    webpack初体验
    【nginx】代理配置
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/125469915