<p data-nodeid="3">本专栏的第二模块主要讲了 etcd 实现的核心原理，从整体剖析 etcd 的架构到依次介绍通信接口、存储机制、etcd-raft 分布式一致性、MVCC 多版本控制、事务、watch 机制、lease 租约以及 etcd 启动过程的分析。</p>
<p data-nodeid="4">这一讲我们将会介绍 etcd 服务端处理客户端请求的完整过程，在回忆之前所讲内容的同时，对服务端处理客户端请求的过程进行总结。</p>
<h3 data-nodeid="5">请求过程概述</h3>
<p data-nodeid="6">我们先来回顾一下 etcd 的整体架构。客户端访问 etcd 服务端，按照分层的架构可以划分为客户端层、API 接口层、etcd Raft 层、业务逻辑层以及 etcd 存储。</p>
<p data-nodeid="7">客户端层如 clientv3 库和 etcdctl 等工具，用户通过 RESTful 方式进行调用，降低了 etcd 的使用复杂度；API 接口层提供了客户端访问服务端的通信协议和接口定义，以及服务端节点之间相互通信的协议。etcd v3 使用 gRPC 作为消息传输协议；etcd Raft 层负责 Leader 选举和日志复制等功能，除了与本节点的 etcd server 通信之外，还与集群中的其他 etcd 节点进行交互，实现分布式一致性数据同步的关键工作；etcd 的业务逻辑层，包括鉴权、租约、KVServer、MVCC 和 Compactor 压缩等核心功能特性；etcd 存储实现了快照、预写式日志 WAL（Write Ahead Log）。etcd V3 版本中，使用 boltdb 来持久化存储集群元数据和用户写入的数据。</p>
<p data-nodeid="8">结合上述内容，我们来看 KV 请求所涉及的 etcd 各个模块之间的交互过程。</p>
<h3 data-nodeid="9">gRPC API</h3>
<p data-nodeid="10">我们先来看 etcd 服务端对外提供服务时 gRPC API 注册的过程。</p>
<p data-nodeid="11">服务端 gRPC 接口定义在 rpc.proto 文件中，这与 service KV 中的定义相对应。在 etcd 启动时，gRPC Server 也需要注册这些 KV 接口。具体启动的实现则定义在了 grpc 包下，代码如下所示：</p>
<pre class="lang-go" data-nodeid="12"><code data-language="go"><span class="hljs-comment">// 位于 etcdserver/api/v3rpc/grpc.go:38</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">Server</span><span class="hljs-params">(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption)</span> *<span class="hljs-title">grpc</span>.<span class="hljs-title">Server</span></span> {
<span class="hljs-keyword">var</span> opts []grpc.ServerOption
opts = <span class="hljs-built_in">append</span>(opts, grpc.CustomCodec(&amp;codec{}))
<span class="hljs-comment">// 创建 grpc Server</span>
grpcServer := grpc.NewServer(<span class="hljs-built_in">append</span>(opts, gopts...)...)
<span class="hljs-comment">// 注册各种服务到 gRPC Server</span>
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
hsrv := health.NewServer()
hsrv.SetServingStatus(<span class="hljs-string">""</span>, healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
grpc_prometheus.Register(grpcServer)
<span class="hljs-keyword">return</span> grpcServer
}
</code></pre>
<p data-nodeid="13"><code data-backticks="1" data-nodeid="89">Server</code>方法主要用于启动各种服务。上述实现中，首先构建所需要的参数以创建 gRPC Server；然后在启动时将实现 KV 各个方法的对象注册到 gRPC Server，在其上注册对应的拦截器，包括 KVServer、WatchServer、LeaseServer、ClusterServer、AuthServer 和 MaintenanceServer 等。</p>
<p data-nodeid="14">下面我们以 KVServer 为例，具体分析 etcd 服务端提供键值对读写的流程。</p>
<h3 data-nodeid="15">读请求</h3>
<p data-nodeid="16">读请求是 etcd 中的高频操作。etcd 中读取单个 key 和批量 key 的方法所使用的都是 Range。因此对于读请求，我们围绕 Range 方法分析即可。</p>
<h4 data-nodeid="17">实现细节</h4>
<p data-nodeid="18">client 发送 Range RPC 请求给 etcd 服务端之后，首先会经过 gRPC Server 上注册的拦截器拦截。我们从 gRPC 包中 KVServer 实现的 Range 方法看起：</p>
<pre class="lang-go" data-nodeid="19"><code data-language="go"><span class="hljs-comment">// 位于 etcdserver/api/v3rpc/key.go:41</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(s *kvServer)</span> <span class="hljs-title">Range</span><span class="hljs-params">(ctx context.Context, r *pb.RangeRequest)</span> <span class="hljs-params">(*pb.RangeResponse, error)</span></span> {
<span class="hljs-comment">// 检验 Range 请求的参数</span>
<span class="hljs-keyword">if</span> err := checkRangeRequest(r); err != <span class="hljs-literal">nil</span> {
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, err
}
resp, err := s.kv.Range(ctx, r) <span class="hljs-comment">//调用 RaftKV.Range()</span>
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, togRPCError(err)
}
<span class="hljs-comment">// 使用 etcd Server 的信息填充 pb.ResponseHeader</span>
s.hdr.fill(resp.Header)
<span class="hljs-keyword">return</span> resp, <span class="hljs-literal">nil</span>
}
</code></pre>
<p data-nodeid="20">Range 请求的主要部分在于调用<code data-backticks="1" data-nodeid="97">RaftKV.Range()</code>方法。这将会调用到 etcdserver 包中对 RaftKV 的实现：</p>
<pre class="lang-go" data-nodeid="21"><code data-language="go"><span class="hljs-comment">// 位于 etcdserver/v3_server.go:90</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(s *EtcdServer)</span> <span class="hljs-title">Range</span><span class="hljs-params">(ctx context.Context, r *pb.RangeRequest)</span> <span class="hljs-params">(*pb.RangeResponse, error)</span></span> {
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
<span class="hljs-keyword">var</span> resp *pb.RangeResponse
<span class="hljs-keyword">var</span> err error
<span class="hljs-comment">// 认证校验</span>
chk := <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">(ai *auth.AuthInfo)</span> <span class="hljs-title">error</span></span> {
<span class="hljs-keyword">return</span> s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
<span class="hljs-comment">// 查询结果</span>
get := <span class="hljs-function"><span class="hljs-keyword">func</span><span class="hljs-params">()</span></span> { resp, err = s.applyV3Base.Range(ctx, <span class="hljs-literal">nil</span>, r) }
<span class="hljs-keyword">if</span> serr := s.doSerialize(ctx, chk, get); serr != <span class="hljs-literal">nil</span> {
err = serr
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, err
}
<span class="hljs-keyword">return</span> resp, err
}
</code></pre>
<p data-nodeid="22">EtcdServer 实现的 Range 方法较为简单，主要是先进行认证校验，然后调用 applierV3 接口中定义的 Range 方法来查询结果，该方法的实现如下：</p>
<pre class="lang-go" data-nodeid="23"><code data-language="go"><span class="hljs-comment">// 位于 etcdserver/apply.go:280</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(a *applierV3backend)</span> <span class="hljs-title">Range</span><span class="hljs-params">(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest)</span> <span class="hljs-params">(*pb.RangeResponse, error)</span></span> {
trace := traceutil.Get(ctx)
resp := &amp;pb.RangeResponse{}
resp.Header = &amp;pb.ResponseHeader{}
<span class="hljs-keyword">if</span> txn == <span class="hljs-literal">nil</span> {
txn = a.s.kv.Read(trace)
<span class="hljs-keyword">defer</span> txn.End()
}
<span class="hljs-comment">// 分页大小</span>
limit := r.Limit
<span class="hljs-keyword">if</span> r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != <span class="hljs-number">0</span> || r.MaxModRevision != <span class="hljs-number">0</span> ||
r.MinCreateRevision != <span class="hljs-number">0</span> || r.MaxCreateRevision != <span class="hljs-number">0</span> {
limit = <span class="hljs-number">0</span>
}
<span class="hljs-keyword">if</span> limit &gt; <span class="hljs-number">0</span> {
<span class="hljs-comment">// 多取一个，判断分页</span>
limit = limit + <span class="hljs-number">1</span>
}
<span class="hljs-comment">// 构造 Range 请求</span>
ro := mvcc.RangeOptions{
Limit: limit,
Rev:   r.Revision,
Count: r.CountOnly,
}
<span class="hljs-comment">// 获取 Range 结果</span>
rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, err
}
<span class="hljs-comment">// 排序</span>
sortOrder := r.SortOrder
<span class="hljs-keyword">if</span> r.SortTarget != pb.RangeRequest_KEY &amp;&amp; sortOrder == pb.RangeRequest_NONE {
sortOrder = pb.RangeRequest_ASCEND
}
}
<span class="hljs-comment">// 分页取</span>
<span class="hljs-keyword">if</span> r.Limit &gt; <span class="hljs-number">0</span> &amp;&amp; <span class="hljs-built_in">len</span>(rr.KVs) &gt; <span class="hljs-keyword">int</span>(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = <span class="hljs-literal">true</span>
}
resp.Header.Revision = rr.Rev
resp.Count = <span class="hljs-keyword">int64</span>(rr.Count)
resp.Kvs = <span class="hljs-built_in">make</span>([]*mvccpb.KeyValue, <span class="hljs-built_in">len</span>(rr.KVs))
<span class="hljs-keyword">for</span> i := <span class="hljs-keyword">range</span> rr.KVs {
<span class="hljs-keyword">if</span> r.KeysOnly {
rr.KVs[i].Value = <span class="hljs-literal">nil</span>
}
resp.Kvs[i] = &amp;rr.KVs[i]
}
<span class="hljs-comment">// 组装响应</span>
<span class="hljs-keyword">return</span> resp, <span class="hljs-literal">nil</span>
}
</code></pre>
<p data-nodeid="24">applierV3backend 中的 Range 在实现时，首先准备分页的大小，多取一个，用于判断分页是否存在下一页。随后构造 Range 请求，调用 mvcc 包中的 Range 方法获取结果，最后对结果进行排序并将结果返回给客户端，由于当前的 mvcc.Range 实现返回按<strong data-nodeid="107">字典序升序</strong>的结果，因此默认情况下仅当目标不是<code data-backticks="1" data-nodeid="105">KEY</code>时才进行升序排序。</p>
<h4 data-nodeid="25">读请求过程描述</h4>
<p data-nodeid="26">经过上述代码分析，我们可以总结客户端发起读请求之后的处理流程，如下图所示：</p>
<p data-nodeid="1160" class=""><img src="https://s0.lgstatic.com/i/image6/M00/24/6A/Cgp9HWBYZxOAHAwTAAArpPTecUY840.png" alt="Drawing 0.png" data-nodeid="1164"></p>
<div data-nodeid="1161"><p style="text-align:center">客户端发起读请求后的处理流程图</p></div>



<ul data-nodeid="29">
<li data-nodeid="30">
<p data-nodeid="31">客户端发起请求之后，clientv3 首先会根据负载均衡算法选择一个合适的 etcd 节点，接着调用 KVServer 模块对应的 RPC 接口，发起 Range 请求的 gRPC 远程调用；</p>
</li>
<li data-nodeid="32">
<p data-nodeid="33">gRPC Server 上注册的拦截器拦截到 Range 请求，实现 Metrics 统计、日志记录等功能；</p>
</li>
<li data-nodeid="34">
<p data-nodeid="35">然后进入读的主要过程，etcd 模式实现了线性读，使得任何客户端通过线性读都能及时访问到键值对的更新；</p>
</li>
<li data-nodeid="36">
<p data-nodeid="37">线性读获取到 Leader 已提交日志索引构造的最新 ReadState 对象，实现本节点状态机的同步；</p>
</li>
<li data-nodeid="38">
<p data-nodeid="39">接着就是调用 MVCC 模块，根据 treeIndex 模块 B-tree 快速查找 key 对应的版本号；</p>
</li>
<li data-nodeid="40">
<p data-nodeid="41">通过获取的版本号作为 key，查询存储在 boltdb 中的键值对，我们在之前的存储部分讲解过此过程。</p>
</li>
</ul>
<h3 data-nodeid="42">写请求</h3>
<p data-nodeid="43">put 操作用于插入或者更新指定的键值对。下面我们具体看一下写请求的整体流程。</p>
<p data-nodeid="44">put 方法更新或者写入键值对到 etcd 中，相比于读请求，多了一步<strong data-nodeid="127">Quota 配额检查存储空间</strong>的情况，用来检查写入时是否有足够的空间。实际执行时只会针对 Put 和 Txn 操作，其他的请求如 Range 则会直接调用对应的 handler。</p>
<pre class="lang-go" data-nodeid="45"><code data-language="go"><span class="hljs-comment">// etcdserver/api/v3rpc/quota.go:59</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(s *quotaKVServer)</span> <span class="hljs-title">Put</span><span class="hljs-params">(ctx context.Context, r *pb.PutRequest)</span> <span class="hljs-params">(*pb.PutResponse, error)</span></span> {
<span class="hljs-keyword">if</span> err := s.qa.check(ctx, r); err != <span class="hljs-literal">nil</span> {
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, err
}
<span class="hljs-keyword">return</span> s.KVServer.Put(ctx, r)
}
</code></pre>
<p data-nodeid="46"><code data-backticks="1" data-nodeid="128">check</code>方法将检查请求是否满足配额。如果空间不足，将会忽略请求并发出可用空间不足的警报。根据 put 方法的调用过程，我们可以列出如下的主要方法：</p>
<pre class="lang-plain" data-nodeid="47"><code data-language="plain">quotaKVServer.Put() api/v3rpc/quota.go 首先检查是否满足需求
|-quotoAlarm.check() 检查
|-KVServer.Put() api/v3rpc/key.go 真正的处理请求
|-checkPutRequest() 校验请求参数是否合法
|-RaftKV.Put() etcdserver/v3_server.go 处理请求
|=EtcdServer.Put() 实际调用的是该函数
| |-raftRequest()
|   |-raftRequestOnce()
|     |-processInternalRaftRequestOnce() 真正开始处理请求
|       |-context.WithTimeout() 创建超时的上下文信息
|       |-raftNode.Propose() raft/node.go
|         |-raftNode.step() 对于类型为 MsgProp 类型消息，向 propc 通道中传入数据
|-header.fill() etcdserver/api/v3rpc/header.go 填充响应的头部信息
</code></pre>
<p data-nodeid="48"><code data-backticks="1" data-nodeid="130">KVServer.Put()</code>的实现位于 api/v3rpc/key.go，用来真正地处理客户端请求。</p>
<pre class="lang-go" data-nodeid="49"><code data-language="go"><span class="hljs-comment">// etcdserver/v3_server.go:130</span>
<span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(s *EtcdServer)</span> <span class="hljs-title">Put</span><span class="hljs-params">(ctx context.Context, r *pb.PutRequest)</span> <span class="hljs-params">(*pb.PutResponse, error)</span></span> {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
<span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span>, err
}
<span class="hljs-keyword">return</span> resp.(*pb.PutResponse), <span class="hljs-literal">nil</span>
}
</code></pre>
<p data-nodeid="50">将数据写入集群，涉及的内容比较复杂，还包括集群的通信。通过封装的 raftRequest，此时已经将添加记录的请求发送到了 Raft 协议的核心层处理。</p>
<p data-nodeid="51">我们来总结一下写请求的处理流程，如下图所示：</p>
<p data-nodeid="1816" class="te-preview-highlight"><img src="https://s0.lgstatic.com/i/image6/M00/24/6A/Cgp9HWBYZyCAVzx8AABGy0wQ5sA031.png" alt="Drawing 1.png" data-nodeid="1820"></p>
<div data-nodeid="1817"><p style="text-align:center">写请求的处理流程图</p></div>



<ul data-nodeid="54">
<li data-nodeid="55">
<p data-nodeid="56">客户端发送写请求，通过负载均衡算法选取合适的 etcd 节点，发起 gRPC 调用。</p>
</li>
<li data-nodeid="57">
<p data-nodeid="58">etcd server 的 gRPC Server 收到这个请求，经过 gRPC 拦截器拦截，实现 Metrics 统计和日志记录等功能。</p>
</li>
<li data-nodeid="59">
<p data-nodeid="60">Quota 模块配额检查 db 的大小，如果超过会报<code data-backticks="1" data-nodeid="141">etcdserver: mvcc: database space exceeded</code>的告警，通过 Raft 日志同步给集群中的节点 db 空间不足，同时告警也会持久化到 db 中。etcd 服务端拒绝写入，对外提供<strong data-nodeid="147">只读</strong>的功能。</p>
</li>
<li data-nodeid="61">
<p data-nodeid="62">配额检查通过，KVServer 模块经过限速、鉴权、包大小判断之后，生成唯一的编号，这时才会将写请求封装为提案消息，提交给 Raft 模块。</p>
</li>
<li data-nodeid="63">
<p data-nodeid="64">写请求的提案只能由 Leader 处理，获取到 Raft 模块的日志条目之后，Leader 会广播提案内容。WAL 模块完成 Raft 日志条目内容封装，当集群大多数节点完成日志条目的持久化，即将提案的状态变更为已提交，可以执行提案内容。</p>
</li>
<li data-nodeid="65">
<p data-nodeid="66">Apply 模块用于执行提案，首先会判断该提案是否被执行过，如果已经执行，则直接返回结束；未执行过的情况下，将会进入 MVCC 模块执行持久化提案内容的操作。</p>
</li>
<li data-nodeid="67">
<p data-nodeid="68">MVCC 模块中的 treeIndex 保存了 key 的历史版本号信息，treeIndex 使用 B-tree 结构维护了 key 对应的版本信息，包含了全局版本号、修改次数等属性。版本号代表着 etcd 中的逻辑时钟，<strong data-nodeid="156">启动时默认的版本号为 1</strong>。键值对的修改、写入和删除都会使得版本号全局单调递增。写事务在执行时，首先根据写入的 key 获取或者更新索引，如果不存在该 key，则会给予当前最大的 currentRevision 自增得到 revision；否则直接根据 key 获取 revision。</p>
</li>
<li data-nodeid="69">
<p data-nodeid="70">根据从 treeIndex 中获取到 revision 、修改次数等属性，以及 put 请求传递的 key-value 信息，作为写入到 boltdb 的 value，而将 revision 作为写入到 boltdb 的 key。同时为了读请求能够获取最新的数据，etcd 在写入 boltdb 时也会同步数据到 buffer。因此上文介绍 etcd 读请求的过程时，会<strong data-nodeid="162">优先从 buffer 中读取</strong>，读取不到的情况下才会从 boltdb 读取，以此来保证一致性和性能。为了提高吞吐量，此时提案数据并未提交保存到 db 文件，而是由 backend 异步 goroutine 定时将批量事务提交。</p>
</li>
<li data-nodeid="71">
<p data-nodeid="72">Server 通过调用网络层接口返回结果给客户端。</p>
</li>
</ul>
<p data-nodeid="73">总的来说，这个过程为客户端发起写请求，由 Leader 节点处理，经过拦截器、Quota 配额检查之后，KVServer 提交一个写请求的提案给 Raft 一致性模块，经过 RaftHTTP 网络转发，集群中的其他节点半数以上持久化成功日志条目，提案的状态将会变成已提交。接着 Apply 通过 MVCC 的 treeIndex、boltdb 执行提案内容，成功之后更新状态机。</p>
<h3 data-nodeid="74">小结</h3>
<p data-nodeid="75">这一讲我们主要介绍了服务端处理客户端读写请求的过程。</p>
<p data-nodeid="76">etcd 服务端的 Raft、KVServer、Quota、WAL、MVCC 和 Apply 等多个模块共同保障了 etcd 的读写一致性以及正确性。通过读写请求过程的学习，你可以熟悉 etcd 各个模块之间的交互，以及分布式组件设计的原理和方法。</p>
<p data-nodeid="77">结合这一讲的内容，你知道如果写请求在执行提案时出现突然宕机，etcd 重启之后是如何保证数据的一致性的吗？欢迎你在留言区提出自己的观点。下一模块我们将会介绍 etcd 的一些实践应用，比如分布式锁、Leader 选举等。</p>